diff --git a/Makefile b/Makefile index 2d252b26..7c74c007 100644 --- a/Makefile +++ b/Makefile @@ -37,6 +37,7 @@ dev: deps IRI_API_ADAPTER_account=app.demo_adapter.DemoAdapter \ IRI_API_ADAPTER_compute=app.demo_adapter.DemoAdapter \ IRI_API_ADAPTER_filesystem=app.demo_adapter.DemoAdapter \ + IRI_API_ADAPTER_storage=app.demo_adapter.DemoAdapter \ IRI_API_ADAPTER_task=app.demo_adapter.DemoAdapter \ IRI_LOG_FILE="$${IRI_LOG_FILE:-$${LOG_FILE:-$(IRI_LOG_FILE)}}" \ IRI_LOG_ROTATION_DAYS="$${IRI_LOG_ROTATION_DAYS:-$${LOG_ROTATION_DAYS:-$(IRI_LOG_ROTATION_DAYS)}}" \ diff --git a/README.md b/README.md index c2412665..b500a21b 100644 --- a/README.md +++ b/README.md @@ -86,6 +86,21 @@ Links to data, created by this api, will concatenate these values producing link - `IRI_API_PARAMS`: as described above, this is a way to customize the API meta-data - `IRI_API_ADAPTER_*`: these values specify the business logic for the per-api-group implementation of a facility_adapter. For example: `IRI_API_ADAPTER_status=myfacility.MyFacilityStatusAdapter` would load the implementation of the `app.routers.status.facility_adapter.FacilityAdapter` abstract class to handle the `status` business logic for your facility. + + The full list of router adapters and the abstract base class each must implement: + + | Variable | Mounted at | Abstract base class your adapter must subclass | + |---|---|---| + | `IRI_API_ADAPTER_facility` | `/facility/...` | [`app.routers.facility.facility_adapter.FacilityAdapter`](app/routers/facility/facility_adapter.py) | + | `IRI_API_ADAPTER_status` | `/status/...` | [`app.routers.status.facility_adapter.FacilityAdapter`](app/routers/status/facility_adapter.py) | + | `IRI_API_ADAPTER_account` | `/account/...` | [`app.routers.account.facility_adapter.FacilityAdapter`](app/routers/account/facility_adapter.py) | + | `IRI_API_ADAPTER_compute` | `/compute/...` | [`app.routers.compute.facility_adapter.FacilityAdapter`](app/routers/compute/facility_adapter.py) | + | `IRI_API_ADAPTER_filesystem` | `/filesystem/...` | [`app.routers.filesystem.facility_adapter.FacilityAdapter`](app/routers/filesystem/facility_adapter.py) | + | `IRI_API_ADAPTER_storage` | `/storage/...` | [`app.routers.storage.facility_adapter.FacilityAdapter`](app/routers/storage/facility_adapter.py) | + | `IRI_API_ADAPTER_task` | `/task/...` | [`app.routers.task.facility_adapter.FacilityAdapter`](app/routers/task/facility_adapter.py) | + + Each value is a `module.path.ClassName` string. `app.demo_adapter.DemoAdapter` implements all of them and is what `make dev` wires up by default. A router whose `IRI_API_ADAPTER_*` is not set is hidden from the API at startup unless `IRI_SHOW_MISSING_ROUTES=true`. + - `IRI_SHOW_MISSING_ROUTES`: hide api groups that don't have an `IRI_API_ADAPTER_*` environment variable defined, if set to `true`. This way if your facility only wishes to expose some api groups but not others, they can be hidden. (Defaults to `false`.) ### Logging diff --git a/app/demo_adapter.py b/app/demo_adapter.py index fa9a23ae..f0bd2acd 100644 --- a/app/demo_adapter.py +++ b/app/demo_adapter.py @@ -26,6 +26,8 @@ from .routers.facility import models as facility_models from .routers.filesystem import facility_adapter as filesystem_adapter from .routers.filesystem import models as filesystem_models +from .routers.storage import facility_adapter as storage_adapter +from .routers.storage import models as storage_models from .routers.status import facility_adapter as status_adapter from .routers.status import models as status_models from .routers.task import facility_adapter as task_adapter @@ -96,7 +98,9 @@ def utc_timestamp() -> int: class DemoAdapter( - status_adapter.FacilityAdapter, account_adapter.FacilityAdapter, compute_adapter.FacilityAdapter, filesystem_adapter.FacilityAdapter, task_adapter.FacilityAdapter, facility_adapter.FacilityAdapter + status_adapter.FacilityAdapter, account_adapter.FacilityAdapter, compute_adapter.FacilityAdapter, + filesystem_adapter.FacilityAdapter, storage_adapter.FacilityAdapter, + task_adapter.FacilityAdapter, facility_adapter.FacilityAdapter ): """A demo implementation of the FacilityAdapter that returns hardcoded data.""" def __init__(self): @@ -109,7 +113,7 @@ def __init__(self): self.project_allocations = [] self.user_allocations = [] self.facility = {} - self.locations = [] + self.locations = {} # resource_id -> list[StorageInstance templates] self.sites = [] self._init_state() @@ -243,6 +247,154 @@ def _init_state(self): self.resources = [pm, hpss, cfs, login, iris, sfapi] + _rw = storage_models.AccessPermissions(read=True, write=True, execute=True) + _ro = storage_models.AccessPermissions(read=True, write=False, execute=True) + + # Paths use {user}, {first} (first letter of username), and {project} as placeholders. + # Project-scoped entries (containing {project}) are expanded per-project at query time. + # Each resource_id carries the access semantics for its own context — a compute + # resource shows in-job permissions, a login/DTN/Globus resource shows what that + # endpoint can do. There is no separate access_outside_of_job field. + + # Perlmutter compute nodes: in-job semantics. Home is read-only inside a job; + # archive (HPSS) is not accessible from compute, so it isn't mounted here at all. + self.locations[pm.id] = [ + storage_models.StorageInstance( + logical_name=storage_models.LogicalName.home, + path="/global/homes/{first}/{user}", + access=_ro, + filesystem="gpfs-homes", + performance_tier="medium", + quota_bytes=40 * 1024**3, + available_bytes=28 * 1024**3, + purge_policy_days=None, + shared=False, + ), + storage_models.StorageInstance( + logical_name=storage_models.LogicalName.scratch, + path="/pscratch/sd/{first}/{user}", + access=_rw, + filesystem="lustre-scratch", + performance_tier="high", + quota_bytes=20 * 1024**4, + available_bytes=14 * 1024**4, + purge_policy_days=30, + shared=False, + ), + storage_models.StorageInstance( + logical_name=storage_models.LogicalName.project, + path="/global/project/projectdirs/{project}/{user}", + access=_rw, + filesystem="gpfs-project", + performance_tier="medium", + quota_bytes=2 * 1024**4, + available_bytes=1024**4, + purge_policy_days=None, + shared=True, + ), + storage_models.StorageInstance( + logical_name=storage_models.LogicalName.campaign, + path="/global/cfs/cdirs/{project}/campaign/{user}", + access=_rw, + filesystem="gpfs-cfs", + performance_tier="medium", + quota_bytes=10 * 1024**4, + available_bytes=8 * 1024**4, + purge_policy_days=120, + shared=True, + ), + ] + + # HPSS tape system: archive only; user accesses it through this resource_id + # (typically via login nodes or htar). Archive is rw from this resource. + self.locations[hpss.id] = [ + storage_models.StorageInstance( + logical_name=storage_models.LogicalName.archive, + path="/home/{first}/{user}", + access=_rw, + filesystem="hpss", + performance_tier="tape", + quota_bytes=None, + available_bytes=None, + purge_policy_days=None, + shared=False, + ), + ] + + # CFS / GPFS resource (queried via login nodes / DTN-style endpoint): all tiers rw, + # shared is read-only because it's the project-shared landing area. + self.locations[cfs.id] = [ + storage_models.StorageInstance( + logical_name=storage_models.LogicalName.home, + path="/global/homes/{first}/{user}", + access=_rw, + filesystem="gpfs-homes", + performance_tier="medium", + quota_bytes=40 * 1024**3, + available_bytes=28 * 1024**3, + purge_policy_days=None, + shared=False, + ), + storage_models.StorageInstance( + logical_name=storage_models.LogicalName.scratch, + path="/pscratch/sd/{first}/{user}", + access=_rw, + filesystem="lustre-scratch", + performance_tier="high", + quota_bytes=20 * 1024**4, + available_bytes=14 * 1024**4, + purge_policy_days=30, + shared=False, + ), + storage_models.StorageInstance( + logical_name=storage_models.LogicalName.project, + path="/global/project/projectdirs/{project}/{user}", + access=_rw, + filesystem="gpfs-project", + performance_tier="medium", + quota_bytes=2 * 1024**4, + available_bytes=1024**4, + purge_policy_days=None, + shared=True, + ), + storage_models.StorageInstance( + logical_name=storage_models.LogicalName.campaign, + path="/global/cfs/cdirs/{project}/campaign/{user}", + access=_rw, + filesystem="gpfs-cfs", + performance_tier="medium", + quota_bytes=10 * 1024**4, + available_bytes=8 * 1024**4, + purge_policy_days=120, + shared=True, + ), + storage_models.StorageInstance( + logical_name=storage_models.LogicalName.shared, + path="/global/cfs/cdirs/{project}/shared", + access=_ro, + filesystem="gpfs-cfs", + performance_tier="medium", + quota_bytes=None, + available_bytes=None, + purge_policy_days=None, + shared=True, + ), + storage_models.StorageInstance( + logical_name=storage_models.LogicalName.temporary, + path="/tmp/{user}", + access=_rw, + filesystem="tmpfs", + performance_tier="high", + quota_bytes=512 * 1024**3, + available_bytes=480 * 1024**3, + purge_policy_days=7, + shared=False, + ), + ] + + # Login nodes: same filesystem layout as CFS — outside-of-job semantics for everything. + self.locations[login.id] = self.locations[cfs.id] + # Populate site resource_ids based on which resources are at each site site1.resource_ids = [r.id for r in self.resources if r.site_id == site1.id] site2.resource_ids = [r.id for r in self.resources if r.site_id == site2.id] @@ -623,6 +775,90 @@ async def cancel_job( # call slurm/etc. to cancel job return True +# ---------------------------------------------- +# Storage API +# ---------------------------------------------- + + @staticmethod + def _slugify_project(name: str) -> str: + """Convert a project name to a path-safe slug (real facilities use codes like 'm1234').""" + return name.lower().replace(" ", "_") + + def _user_project_codes(self, user: User) -> list[str]: + """Return the path-slug codes of all projects the user belongs to.""" + return [self._slugify_project(p.name) for p in self.projects if user.id in p.user_ids] + + def _user_member_of(self, user: User, project_code: str) -> bool: + """Authorization check: is the user a member of the named project?""" + return any( + user.id in p.user_ids and self._slugify_project(p.name) == project_code + for p in self.projects + ) + + def _resolve_path(self, template: str, user: User, project: str | None) -> str: + first = user.id[0] if user.id else "u" + path = template.replace("{user}", user.id).replace("{first}", first) + if project: + path = path.replace("{project}", project) + return path + + def _apply_intent_filter( + self, + instance: storage_models.StorageInstance, + intent: storage_models.StorageIntent | None, + ) -> bool: + """Return False if this storage instance should be excluded for the given intent.""" + if intent == storage_models.StorageIntent.long_term_storage: + return instance.logical_name == storage_models.LogicalName.archive + if intent == storage_models.StorageIntent.staging: + return instance.logical_name != storage_models.LogicalName.archive + if intent == storage_models.StorageIntent.write: + return instance.access.write + return True + + async def get_locations( + self, + resource: status_models.Resource, + user: User, + logicalpath: storage_models.LogicalName | None, + project: str | None, + allocation: str | None, + intent: storage_models.StorageIntent | None, + ) -> list[storage_models.StorageInstance]: + templates = self.locations.get(resource.id, []) + effective_project = project or allocation + + # Authorization: a user can only resolve paths for their own projects + if effective_project and not self._user_member_of(user, effective_project): + raise HTTPException(status_code=403, detail=f"User is not a member of project '{effective_project}'") + + # Expand project-scoped paths across ALL of the user's projects when none specified + project_codes = [effective_project] if effective_project else self._user_project_codes(user) + + result = [] + for m in templates: + if logicalpath and m.logical_name != logicalpath: + continue + if not self._apply_intent_filter(m, intent): + continue + + is_project_scoped = "{project}" in m.path + expand_over = project_codes if is_project_scoped else [None] + + for code in expand_over: + result.append(storage_models.StorageInstance( + logical_name=m.logical_name, + path=self._resolve_path(m.path, user, code), + filesystem=m.filesystem, + performance_tier=m.performance_tier, + quota_bytes=m.quota_bytes, + available_bytes=m.available_bytes, + purge_policy_days=m.purge_policy_days, + shared=m.shared, + access=m.access, + )) + return result + def validate_path(self, path: str, allow_symlinks: bool = True) -> str: """Validate that the given path is within the sandbox base directory and optionally check for symlinks.""" basedir = PathSandbox.get_base_temp_dir() diff --git a/app/main.py b/app/main.py index 1f6faccc..475cfa86 100644 --- a/app/main.py +++ b/app/main.py @@ -25,6 +25,7 @@ from app.routers.account import account from app.routers.compute import compute from app.routers.filesystem import filesystem +from app.routers.storage import storage from app.routers.task import task configure_logging(config.LOG_LEVEL) @@ -81,6 +82,7 @@ async def dispatch(self, request: Request, call_next): APP.include_router(account.router, prefix=api_prefix) APP.include_router(compute.router, prefix=api_prefix) APP.include_router(filesystem.router, prefix=api_prefix) +APP.include_router(storage.router, prefix=api_prefix) APP.include_router(task.router, prefix=api_prefix) logging.getLogger().info(f"API path: {api_prefix}") diff --git a/app/routers/storage/__init__.py b/app/routers/storage/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/app/routers/storage/facility_adapter.py b/app/routers/storage/facility_adapter.py new file mode 100644 index 00000000..d650dc58 --- /dev/null +++ b/app/routers/storage/facility_adapter.py @@ -0,0 +1,38 @@ +from abc import abstractmethod + +from ...types.user import User +from ..status import models as status_models +from . import models as storage_models +from ..iri_router import AuthenticatedAdapter + + +class FacilityAdapter(AuthenticatedAdapter): + """ + Facility-specific storage location adapter. + Use the `IRI_API_ADAPTER_storage` environment variable + (defaults to `app.demo_adapter.DemoAdapter`) to install your implementation. + """ + + @abstractmethod + async def get_locations( + self, + resource: status_models.Resource, + user: User, + logicalpath: storage_models.LogicalName | None, + project: str | None, + allocation: str | None, + intent: storage_models.StorageIntent | None, + ) -> list[storage_models.StorageInstance]: + """ + Return resolved storage paths for the user at the given resource. The returned + instances also capture the access semantics of that resource context, so callers + do not need a separate mounts endpoint. + Results are optionally filtered by logical name, project/allocation, and intent. + + Intent semantics: + - staging: exclude archive (too slow for staging workflows) + - long-term-storage: return only archive + - write: exclude paths that are read-only in a job context + - read: no filtering (all accessible paths) + """ + pass diff --git a/app/routers/storage/models.py b/app/routers/storage/models.py new file mode 100644 index 00000000..4aaa7c81 --- /dev/null +++ b/app/routers/storage/models.py @@ -0,0 +1,79 @@ +"""Models for storage location and mount API endpoints.""" +from enum import Enum +from pydantic import Field, BaseModel + + +class LogicalName(str, Enum): + """Well-known logical filesystem tier names across HPC facilities.""" + home = "home" + scratch = "scratch" + project = "project" + campaign = "campaign" + archive = "archive" + shared = "shared" + temporary = "temporary" + + +class StorageIntent(str, Enum): + """Intended use hint to filter returned storage locations.""" + read = "read" + write = "write" + staging = "staging" + long_term_storage = "long-term-storage" + + +class AccessPermissions(BaseModel): + """POSIX-style access permissions for a storage location.""" + read: bool = Field(..., description="Read permission", example=True) + write: bool = Field(..., description="Write permission", example=True) + execute: bool = Field(..., description="Execute/traverse permission", example=True) + + +class StorageInstance(BaseModel): + """ + A concrete storage instance visible through a resource for a given logical filesystem tier. + """ + logical_name: LogicalName = Field( + ..., + description="Logical filesystem tier name", + example="scratch", + ) + path: str = Field( + ..., + description="Absolute resolved path for this user at the resource", + example="/pscratch/sd/j/jbalcas", + ) + filesystem: str | None = Field( + default=None, + description="Underlying filesystem type or label", + example="lustre-scratch", + ) + performance_tier: str | None = Field( + default=None, + description="Performance tier classification (high / medium / low / tape)", + example="high", + ) + quota_bytes: int | None = Field( + default=None, + description="Total quota in bytes (None = unlimited or unknown)", + example=5000000000000, + ) + available_bytes: int | None = Field( + default=None, + description="Available bytes remaining within the quota", + example=4200000000000, + ) + purge_policy_days: int | None = Field( + default=None, + description="Days of inactivity before automatic purge; None means no purge policy", + example=30, + ) + shared: bool = Field( + default=False, + description="True if the path is shared across multiple users or projects", + example=False, + ) + access: AccessPermissions = Field( + ..., + description="Access permissions through the queried resource context", + ) diff --git a/app/routers/storage/storage.py b/app/routers/storage/storage.py new file mode 100644 index 00000000..cdfc89ac --- /dev/null +++ b/app/routers/storage/storage.py @@ -0,0 +1,69 @@ +from typing import Annotated + +from fastapi import Depends, HTTPException, Query, Request, status + +from ...types.http import forbidExtraQueryParams +from ...types.user import User +from .. import iri_router +from ..error_handlers import DEFAULT_RESPONSES +from ..iri_meta import iri_meta_dict +from ..status.status import router as status_router +from . import facility_adapter, models + + +router = iri_router.IriRouter( + facility_adapter.FacilityAdapter, + prefix="/storage", + tags=["storage"], +) + + +@router.get( + "/locations/{resource_id}", + summary="Get resolved storage locations for a resource", + description=( + "Return the resolved storage paths for the authenticated user at the specified " + "resource. The response includes both placement and access semantics for that " + "resource context, so it can be used in place of a separate mounts endpoint. " + "Optionally filter by logical name, project/allocation, and intent.\n\n" + "Intent semantics:\n" + "- `staging`: excludes archive (too slow for staging workflows)\n" + "- `long-term-storage`: returns only archive\n" + "- `write`: excludes paths that are read-only in a compute-job context\n" + "- `read`: no filtering\n" + ), + status_code=status.HTTP_200_OK, + response_model=list[models.StorageInstance], + responses=DEFAULT_RESPONSES, + operation_id="getStorageLocations", + openapi_extra=iri_meta_dict("in_development", "optional"), +) +async def get_locations( + resource_id: str, + request: Request, + logicalpath: Annotated[ + models.LogicalName | None, + Query(description="Filter to a specific logical filesystem tier"), + ] = None, + project: Annotated[ + str | None, + Query(description="Project or allocation identifier for project-scoped paths"), + ] = None, + allocation: Annotated[ + str | None, + Query(description="Allocation identifier (alternative to project)"), + ] = None, + intent: Annotated[ + models.StorageIntent | None, + Query(description="Intended use to guide which locations are returned"), + ] = None, + user: User = Depends(router.current_user), + _forbid=Depends(forbidExtraQueryParams("logicalpath", "project", "allocation", "intent")), +) -> list[models.StorageInstance]: + resource = await status_router.adapter.get_resource(resource_id) + if not resource: + raise HTTPException(status_code=404, detail="Resource not found") + locations = await router.adapter.get_locations(resource, user, logicalpath, project, allocation, intent) + if logicalpath and not locations: + raise HTTPException(status_code=404, detail=f"No storage location found for logical name '{logicalpath}'") + return locations diff --git a/test/test_storage.py b/test/test_storage.py new file mode 100644 index 00000000..cad737e6 --- /dev/null +++ b/test/test_storage.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python3 +"""Focused regression tests for the remaining storage endpoint contract and OpenAPI wiring.""" + +import asyncio +import os +import unittest + +os.environ.setdefault("IRI_SHOW_MISSING_ROUTES", "true") + +from app.demo_adapter import DemoAdapter +from app.main import APP +from app.routers.storage import models as storage_models + + +class StorageEndpointTests(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.adapter = DemoAdapter() + cls.user = cls.adapter.user + cls.openapi = APP.openapi() + + @classmethod + def _resource(cls, group: str, name: str): + for resource in cls.adapter.resources: + if resource.group == group and resource.name == name: + return resource + raise AssertionError(f"Unable to find resource {group}/{name}") + + def test_resolved_locations_return_shared_storage_instance_shape(self): + compute_resource = self._resource("perlmutter", "compute nodes") + + payload = asyncio.run( + self.adapter.get_locations( + compute_resource, + self.user, + None, + None, + None, + None, + ) + ) + + self.assertGreater(len(payload), 0) + self.assertTrue(all(isinstance(item, storage_models.StorageInstance) for item in payload)) + + first = payload[0].model_dump() + self.assertEqual( + set(first.keys()), + { + "logical_name", + "path", + "filesystem", + "performance_tier", + "quota_bytes", + "available_bytes", + "purge_policy_days", + "shared", + "access", + }, + ) + home_entries = [entry for entry in payload if entry.logical_name == storage_models.LogicalName.home] + self.assertEqual(len(home_entries), 1) + self.assertFalse(home_entries[0].access.write) + + def test_project_scoped_entries_expand_under_remaining_locations_endpoint(self): + login_resource = self._resource("perlmutter", "login nodes") + + location_payload = asyncio.run( + self.adapter.get_locations( + login_resource, + self.user, + storage_models.LogicalName.shared, + None, + None, + None, + ) + ) + + self.assertGreater(len(location_payload), 0) + self.assertTrue( + all(entry.logical_name == storage_models.LogicalName.shared for entry in location_payload) + ) + self.assertTrue( + all(not entry.access.write for entry in location_payload) + ) + self.assertEqual(len(location_payload), len(self.adapter._user_project_codes(self.user))) + + def test_openapi_exposes_only_resource_scoped_storage_locations(self): + resolved_locations = self.openapi["paths"]["/api/v1/storage/locations/{resource_id}"]["get"] + self.assertNotIn("/api/v1/storage/locations", self.openapi["paths"]) + self.assertNotIn("/api/v1/storage/mounts/{resource_id}", self.openapi["paths"]) + self.assertTrue( + resolved_locations["responses"]["200"]["content"]["application/json"]["schema"]["items"]["$ref"].endswith( + "/StorageInstance" + ) + ) + + +if __name__ == "__main__": + unittest.main()