From 243baed4599fcae2405c7c6fba24a4f72e7b4dcc Mon Sep 17 00:00:00 2001 From: Justas Balcas Date: Tue, 12 May 2026 05:07:11 -0500 Subject: [PATCH 1/2] Logical file name/directory normalization. Have a way to identify home/project/scratch/etc dir --- Makefile | 1 + README.md | 15 ++ app/demo_adapter.py | 279 +++++++++++++++++++++++- app/main.py | 2 + app/routers/storage/__init__.py | 0 app/routers/storage/facility_adapter.py | 59 +++++ app/routers/storage/models.py | 134 ++++++++++++ app/routers/storage/storage.py | 124 +++++++++++ 8 files changed, 612 insertions(+), 2 deletions(-) create mode 100644 app/routers/storage/__init__.py create mode 100644 app/routers/storage/facility_adapter.py create mode 100644 app/routers/storage/models.py create mode 100644 app/routers/storage/storage.py diff --git a/Makefile b/Makefile index 2d252b26..7c74c007 100644 --- a/Makefile +++ b/Makefile @@ -37,6 +37,7 @@ dev: deps IRI_API_ADAPTER_account=app.demo_adapter.DemoAdapter \ IRI_API_ADAPTER_compute=app.demo_adapter.DemoAdapter \ IRI_API_ADAPTER_filesystem=app.demo_adapter.DemoAdapter \ + IRI_API_ADAPTER_storage=app.demo_adapter.DemoAdapter \ IRI_API_ADAPTER_task=app.demo_adapter.DemoAdapter \ IRI_LOG_FILE="$${IRI_LOG_FILE:-$${LOG_FILE:-$(IRI_LOG_FILE)}}" \ IRI_LOG_ROTATION_DAYS="$${IRI_LOG_ROTATION_DAYS:-$${LOG_ROTATION_DAYS:-$(IRI_LOG_ROTATION_DAYS)}}" \ diff --git a/README.md b/README.md index c2412665..b500a21b 100644 --- a/README.md +++ b/README.md @@ -86,6 +86,21 @@ Links to data, created by this api, will concatenate these values producing link - `IRI_API_PARAMS`: as described above, this is a way to customize the API meta-data - `IRI_API_ADAPTER_*`: these values specify the business logic for the per-api-group implementation of a facility_adapter. For example: `IRI_API_ADAPTER_status=myfacility.MyFacilityStatusAdapter` would load the implementation of the `app.routers.status.facility_adapter.FacilityAdapter` abstract class to handle the `status` business logic for your facility. + + The full list of router adapters and the abstract base class each must implement: + + | Variable | Mounted at | Abstract base class your adapter must subclass | + |---|---|---| + | `IRI_API_ADAPTER_facility` | `/facility/...` | [`app.routers.facility.facility_adapter.FacilityAdapter`](app/routers/facility/facility_adapter.py) | + | `IRI_API_ADAPTER_status` | `/status/...` | [`app.routers.status.facility_adapter.FacilityAdapter`](app/routers/status/facility_adapter.py) | + | `IRI_API_ADAPTER_account` | `/account/...` | [`app.routers.account.facility_adapter.FacilityAdapter`](app/routers/account/facility_adapter.py) | + | `IRI_API_ADAPTER_compute` | `/compute/...` | [`app.routers.compute.facility_adapter.FacilityAdapter`](app/routers/compute/facility_adapter.py) | + | `IRI_API_ADAPTER_filesystem` | `/filesystem/...` | [`app.routers.filesystem.facility_adapter.FacilityAdapter`](app/routers/filesystem/facility_adapter.py) | + | `IRI_API_ADAPTER_storage` | `/storage/...` | [`app.routers.storage.facility_adapter.FacilityAdapter`](app/routers/storage/facility_adapter.py) | + | `IRI_API_ADAPTER_task` | `/task/...` | [`app.routers.task.facility_adapter.FacilityAdapter`](app/routers/task/facility_adapter.py) | + + Each value is a `module.path.ClassName` string. `app.demo_adapter.DemoAdapter` implements all of them and is what `make dev` wires up by default. A router whose `IRI_API_ADAPTER_*` is not set is hidden from the API at startup unless `IRI_SHOW_MISSING_ROUTES=true`. + - `IRI_SHOW_MISSING_ROUTES`: hide api groups that don't have an `IRI_API_ADAPTER_*` environment variable defined, if set to `true`. This way if your facility only wishes to expose some api groups but not others, they can be hidden. (Defaults to `false`.) ### Logging diff --git a/app/demo_adapter.py b/app/demo_adapter.py index fa9a23ae..75faa13a 100644 --- a/app/demo_adapter.py +++ b/app/demo_adapter.py @@ -26,6 +26,8 @@ from .routers.facility import models as facility_models from .routers.filesystem import facility_adapter as filesystem_adapter from .routers.filesystem import models as filesystem_models +from .routers.storage import facility_adapter as storage_adapter +from .routers.storage import models as storage_models from .routers.status import facility_adapter as status_adapter from .routers.status import models as status_models from .routers.task import facility_adapter as task_adapter @@ -96,7 +98,9 @@ def utc_timestamp() -> int: class DemoAdapter( - status_adapter.FacilityAdapter, account_adapter.FacilityAdapter, compute_adapter.FacilityAdapter, filesystem_adapter.FacilityAdapter, task_adapter.FacilityAdapter, facility_adapter.FacilityAdapter + status_adapter.FacilityAdapter, account_adapter.FacilityAdapter, compute_adapter.FacilityAdapter, + filesystem_adapter.FacilityAdapter, storage_adapter.FacilityAdapter, + task_adapter.FacilityAdapter, facility_adapter.FacilityAdapter ): """A demo implementation of the FacilityAdapter that returns hardcoded data.""" def __init__(self): @@ -109,7 +113,7 @@ def __init__(self): self.project_allocations = [] self.user_allocations = [] self.facility = {} - self.locations = [] + self.locations = {} # resource_id -> list[StorageMount] self.sites = [] self._init_state() @@ -243,6 +247,154 @@ def _init_state(self): self.resources = [pm, hpss, cfs, login, iris, sfapi] + _rw = storage_models.AccessPermissions(read=True, write=True, execute=True) + _ro = storage_models.AccessPermissions(read=True, write=False, execute=True) + + # Paths use {user}, {first} (first letter of username), and {project} as placeholders. + # Project-scoped entries (containing {project}) are expanded per-project at query time. + # Each resource_id carries the access semantics for its own context — a compute + # resource shows in-job permissions, a login/DTN/Globus resource shows what that + # endpoint can do. There is no separate access_outside_of_job field. + + # Perlmutter compute nodes: in-job semantics. Home is read-only inside a job; + # archive (HPSS) is not accessible from compute, so it isn't mounted here at all. + self.locations[pm.id] = [ + storage_models.StorageMount( + logical_name=storage_models.LogicalName.home, + path="/global/homes/{first}/{user}", + access=_ro, + filesystem="gpfs-homes", + performance_tier="medium", + quota_bytes=40 * 1024**3, + available_bytes=28 * 1024**3, + purge_policy_days=None, + shared=False, + ), + storage_models.StorageMount( + logical_name=storage_models.LogicalName.scratch, + path="/pscratch/sd/{first}/{user}", + access=_rw, + filesystem="lustre-scratch", + performance_tier="high", + quota_bytes=20 * 1024**4, + available_bytes=14 * 1024**4, + purge_policy_days=30, + shared=False, + ), + storage_models.StorageMount( + logical_name=storage_models.LogicalName.project, + path="/global/project/projectdirs/{project}/{user}", + access=_rw, + filesystem="gpfs-project", + performance_tier="medium", + quota_bytes=2 * 1024**4, + available_bytes=1024**4, + purge_policy_days=None, + shared=True, + ), + storage_models.StorageMount( + logical_name=storage_models.LogicalName.campaign, + path="/global/cfs/cdirs/{project}/campaign/{user}", + access=_rw, + filesystem="gpfs-cfs", + performance_tier="medium", + quota_bytes=10 * 1024**4, + available_bytes=8 * 1024**4, + purge_policy_days=120, + shared=True, + ), + ] + + # HPSS tape system: archive only; user accesses it through this resource_id + # (typically via login nodes or htar). Archive is rw from this resource. + self.locations[hpss.id] = [ + storage_models.StorageMount( + logical_name=storage_models.LogicalName.archive, + path="/home/{first}/{user}", + access=_rw, + filesystem="hpss", + performance_tier="tape", + quota_bytes=None, + available_bytes=None, + purge_policy_days=None, + shared=False, + ), + ] + + # CFS / GPFS resource (queried via login nodes / DTN-style endpoint): all tiers rw, + # shared is read-only because it's the project-shared landing area. + self.locations[cfs.id] = [ + storage_models.StorageMount( + logical_name=storage_models.LogicalName.home, + path="/global/homes/{first}/{user}", + access=_rw, + filesystem="gpfs-homes", + performance_tier="medium", + quota_bytes=40 * 1024**3, + available_bytes=28 * 1024**3, + purge_policy_days=None, + shared=False, + ), + storage_models.StorageMount( + logical_name=storage_models.LogicalName.scratch, + path="/pscratch/sd/{first}/{user}", + access=_rw, + filesystem="lustre-scratch", + performance_tier="high", + quota_bytes=20 * 1024**4, + available_bytes=14 * 1024**4, + purge_policy_days=30, + shared=False, + ), + storage_models.StorageMount( + logical_name=storage_models.LogicalName.project, + path="/global/project/projectdirs/{project}/{user}", + access=_rw, + filesystem="gpfs-project", + performance_tier="medium", + quota_bytes=2 * 1024**4, + available_bytes=1024**4, + purge_policy_days=None, + shared=True, + ), + storage_models.StorageMount( + logical_name=storage_models.LogicalName.campaign, + path="/global/cfs/cdirs/{project}/campaign/{user}", + access=_rw, + filesystem="gpfs-cfs", + performance_tier="medium", + quota_bytes=10 * 1024**4, + available_bytes=8 * 1024**4, + purge_policy_days=120, + shared=True, + ), + storage_models.StorageMount( + logical_name=storage_models.LogicalName.shared, + path="/global/cfs/cdirs/{project}/shared", + access=_ro, + filesystem="gpfs-cfs", + performance_tier="medium", + quota_bytes=None, + available_bytes=None, + purge_policy_days=None, + shared=True, + ), + storage_models.StorageMount( + logical_name=storage_models.LogicalName.temporary, + path="/tmp/{user}", + access=_rw, + filesystem="tmpfs", + performance_tier="high", + quota_bytes=512 * 1024**3, + available_bytes=480 * 1024**3, + purge_policy_days=7, + shared=False, + ), + ] + + # Login nodes: same filesystem layout as CFS — outside-of-job semantics for everything. + self.locations[login.id] = self.locations[cfs.id] + # Populate site resource_ids based on which resources are at each site site1.resource_ids = [r.id for r in self.resources if r.site_id == site1.id] site2.resource_ids = [r.id for r in self.resources if r.site_id == site2.id] @@ -623,6 +775,129 @@ async def cancel_job( # call slurm/etc. to cancel job return True +# ---------------------------------------------- +# Storage API +# ---------------------------------------------- + + @staticmethod + def _slugify_project(name: str) -> str: + """Convert a project name to a path-safe slug (real facilities use codes like 'm1234').""" + return name.lower().replace(" ", "_") + + def _user_project_codes(self, user: User) -> list[str]: + """Return the path-slug codes of all projects the user belongs to.""" + return [self._slugify_project(p.name) for p in self.projects if user.id in p.user_ids] + + def _user_member_of(self, user: User, project_code: str) -> bool: + """Authorization check: is the user a member of the named project?""" + return any( + user.id in p.user_ids and self._slugify_project(p.name) == project_code + for p in self.projects + ) + + def _resolve_path(self, template: str, user: User, project: str | None) -> str: + first = user.id[0] if user.id else "u" + path = template.replace("{user}", user.id).replace("{first}", first) + if project: + path = path.replace("{project}", project) + return path + + def _apply_intent_filter( + self, + mount: storage_models.StorageMount, + intent: storage_models.StorageIntent | None, + ) -> bool: + """Return False if this mount should be excluded for the given intent.""" + if intent == storage_models.StorageIntent.long_term_storage: + return mount.logical_name == storage_models.LogicalName.archive + if intent == storage_models.StorageIntent.staging: + return mount.logical_name != storage_models.LogicalName.archive + if intent == storage_models.StorageIntent.write: + return mount.access.write + return True + + async def get_logical_names(self) -> list[storage_models.LogicalName]: + return list(storage_models.LogicalName) + + async def get_locations( + self, + resource: status_models.Resource, + user: User, + logicalpath: storage_models.LogicalName | None, + project: str | None, + allocation: str | None, + intent: storage_models.StorageIntent | None, + ) -> list[storage_models.StorageLocation]: + templates = self.locations.get(resource.id, []) + effective_project = project or allocation + + # Authorization: a user can only resolve paths for their own projects + if effective_project and not self._user_member_of(user, effective_project): + raise HTTPException(status_code=403, detail=f"User is not a member of project '{effective_project}'") + + # Expand project-scoped paths across ALL of the user's projects when none specified + project_codes = [effective_project] if effective_project else self._user_project_codes(user) + + result = [] + for m in templates: + if logicalpath and m.logical_name != logicalpath: + continue + if not self._apply_intent_filter(m, intent): + continue + + is_project_scoped = "{project}" in m.path + expand_over = project_codes if is_project_scoped else [None] + + for code in expand_over: + result.append(storage_models.StorageLocation( + logical_name=m.logical_name, + path=self._resolve_path(m.path, user, code), + filesystem=m.filesystem, + performance_tier=m.performance_tier, + quota_bytes=m.quota_bytes, + available_bytes=m.available_bytes, + purge_policy_days=m.purge_policy_days, + shared=m.shared, + access=m.access, + )) + return result + + async def get_mounts( + self, + resource: status_models.Resource, + user: User, + project: str | None, + intent: storage_models.StorageIntent | None, + ) -> list[storage_models.StorageMount]: + templates = self.locations.get(resource.id, []) + + if project and not self._user_member_of(user, project): + raise HTTPException(status_code=403, detail=f"User is not a member of project '{project}'") + + project_codes = [project] if project else self._user_project_codes(user) + + result = [] + for m in templates: + if not self._apply_intent_filter(m, intent): + continue + + is_project_scoped = "{project}" in m.path + expand_over = project_codes if is_project_scoped else [None] + + for code in expand_over: + result.append(storage_models.StorageMount( + logical_name=m.logical_name, + path=self._resolve_path(m.path, user, code), + access=m.access, + filesystem=m.filesystem, + performance_tier=m.performance_tier, + quota_bytes=m.quota_bytes, + available_bytes=m.available_bytes, + purge_policy_days=m.purge_policy_days, + shared=m.shared, + )) + return result + def validate_path(self, path: str, allow_symlinks: bool = True) -> str: """Validate that the given path is within the sandbox base directory and optionally check for symlinks.""" basedir = PathSandbox.get_base_temp_dir() diff --git a/app/main.py b/app/main.py index 1f6faccc..475cfa86 100644 --- a/app/main.py +++ b/app/main.py @@ -25,6 +25,7 @@ from app.routers.account import account from app.routers.compute import compute from app.routers.filesystem import filesystem +from app.routers.storage import storage from app.routers.task import task configure_logging(config.LOG_LEVEL) @@ -81,6 +82,7 @@ async def dispatch(self, request: Request, call_next): APP.include_router(account.router, prefix=api_prefix) APP.include_router(compute.router, prefix=api_prefix) APP.include_router(filesystem.router, prefix=api_prefix) +APP.include_router(storage.router, prefix=api_prefix) APP.include_router(task.router, prefix=api_prefix) logging.getLogger().info(f"API path: {api_prefix}") diff --git a/app/routers/storage/__init__.py b/app/routers/storage/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/app/routers/storage/facility_adapter.py b/app/routers/storage/facility_adapter.py new file mode 100644 index 00000000..ae88fd8e --- /dev/null +++ b/app/routers/storage/facility_adapter.py @@ -0,0 +1,59 @@ +from abc import abstractmethod + +from ...types.user import User +from ..status import models as status_models +from . import models as storage_models +from ..iri_router import AuthenticatedAdapter + + +class FacilityAdapter(AuthenticatedAdapter): + """ + Facility-specific storage location adapter. + Use the `IRI_API_ADAPTER_storage` environment variable + (defaults to `app.demo_adapter.DemoAdapter`) to install your implementation. + """ + + @abstractmethod + async def get_logical_names(self) -> list[storage_models.LogicalName]: + """Return the logical filesystem tier names supported at this facility.""" + pass + + @abstractmethod + async def get_locations( + self, + resource: status_models.Resource, + user: User, + logicalpath: storage_models.LogicalName | None, + project: str | None, + allocation: str | None, + intent: storage_models.StorageIntent | None, + ) -> list[storage_models.StorageLocation]: + """ + Return resolved storage paths for the user at the given resource. + Results are optionally filtered by logical name, project/allocation, and intent. + + Intent semantics: + - staging: exclude archive (too slow for staging workflows) + - long-term-storage: return only archive + - write: exclude paths that are read-only in a job context + - read: no filtering (all accessible paths) + """ + pass + + @abstractmethod + async def get_mounts( + self, + resource: status_models.Resource, + user: User, + project: str | None, + intent: storage_models.StorageIntent | None, + ) -> list[storage_models.StorageMount]: + """ + Return all storage volumes mounted at the resource. The access permissions + in each StorageMount reflect what the user can do *through this resource_id*: + a compute resource shows in-job permissions; a login / DTN / Globus resource + shows what that endpoint can do. Callers select the appropriate resource_id + for the context they need (e.g. compute resource for jobs, Globus collection + for transfers). + """ + pass diff --git a/app/routers/storage/models.py b/app/routers/storage/models.py new file mode 100644 index 00000000..7757195c --- /dev/null +++ b/app/routers/storage/models.py @@ -0,0 +1,134 @@ +"""Models for storage location and mount API endpoints.""" +from enum import Enum +from pydantic import Field, BaseModel + + +class LogicalName(str, Enum): + """Well-known logical filesystem tier names across HPC facilities.""" + home = "home" + scratch = "scratch" + project = "project" + campaign = "campaign" + archive = "archive" + shared = "shared" + temporary = "temporary" + + +class StorageIntent(str, Enum): + """Intended use hint to filter returned storage locations.""" + read = "read" + write = "write" + staging = "staging" + long_term_storage = "long-term-storage" + + +class AccessPermissions(BaseModel): + """POSIX-style access permissions for a storage location.""" + read: bool = Field(..., description="Read permission", example=True) + write: bool = Field(..., description="Write permission", example=True) + execute: bool = Field(..., description="Execute/traverse permission", example=True) + + +class StorageLocation(BaseModel): + """ + Resolved storage path for a user at a resource, for a given logical filesystem tier. + Answers: given this user/project/intent, where should data live at this facility? + """ + logical_name: LogicalName = Field( + ..., + description="Logical filesystem tier name", + example="scratch", + ) + path: str = Field( + ..., + description="Absolute resolved path for this user at the resource", + example="/pscratch/sd/j/jbalcas", + ) + filesystem: str | None = Field( + default=None, + description="Underlying filesystem type or label", + example="lustre-scratch", + ) + performance_tier: str | None = Field( + default=None, + description="Performance tier classification (high / medium / low / tape)", + example="high", + ) + quota_bytes: int | None = Field( + default=None, + description="Total quota in bytes (None = unlimited or unknown)", + example=5000000000000, + ) + available_bytes: int | None = Field( + default=None, + description="Available bytes remaining within the quota", + example=4200000000000, + ) + purge_policy_days: int | None = Field( + default=None, + description="Days of inactivity before automatic purge; None means no purge policy", + example=30, + ) + shared: bool = Field( + default=False, + description="True if the path is shared across multiple users or projects", + example=False, + ) + access: AccessPermissions = Field( + ..., + description="Access permissions at this location", + ) + + +class StorageMount(BaseModel): + """ + A storage volume mounted at a resource. The access permissions reflect what the + user can do *through this resource_id*: a compute resource shows in-job semantics, + a login/DTN/Globus resource shows the permissions available from that endpoint. + Callers query the appropriate resource_id for the context they need. + """ + logical_name: LogicalName = Field( + ..., + description="Logical filesystem tier name", + example="scratch", + ) + path: str = Field( + ..., + description="Absolute mount path visible to the user", + example="/pscratch/sd/j/jbalcas", + ) + access: AccessPermissions = Field( + ..., + description="Access permissions for this volume through this resource_id " + "(compute resource = in-job; login/DTN/Globus resource = outside-job).", + ) + filesystem: str | None = Field( + default=None, + description="Underlying filesystem type or label", + example="lustre-scratch", + ) + performance_tier: str | None = Field( + default=None, + description="Performance tier classification (high / medium / low / tape)", + example="high", + ) + quota_bytes: int | None = Field( + default=None, + description="Total quota in bytes (None = unlimited or unknown)", + example=5000000000000, + ) + available_bytes: int | None = Field( + default=None, + description="Available bytes remaining within the quota", + example=4200000000000, + ) + purge_policy_days: int | None = Field( + default=None, + description="Days of inactivity before automatic purge; None means no purge policy", + example=30, + ) + shared: bool = Field( + default=False, + description="True if the path is shared across multiple users or projects", + example=False, + ) diff --git a/app/routers/storage/storage.py b/app/routers/storage/storage.py new file mode 100644 index 00000000..09ef7c92 --- /dev/null +++ b/app/routers/storage/storage.py @@ -0,0 +1,124 @@ +from typing import Annotated + +from fastapi import Depends, HTTPException, Query, Request, status + +from ...types.http import forbidExtraQueryParams +from ...types.user import User +from .. import iri_router +from ..error_handlers import DEFAULT_RESPONSES +from ..iri_meta import iri_meta_dict +from ..status.status import router as status_router +from . import facility_adapter, models + + +router = iri_router.IriRouter( + facility_adapter.FacilityAdapter, + prefix="/storage", + tags=["storage"], +) + + +@router.get( + "/locations", + summary="List supported logical filesystem names", + description=( + "Return the logical filesystem tier names supported at this facility " + "(e.g. home, scratch, project, archive). Not every facility is required to " + "support all names." + ), + status_code=status.HTTP_200_OK, + response_model=list[models.LogicalName], + responses=DEFAULT_RESPONSES, + operation_id="getStorageLogicalNames", + openapi_extra=iri_meta_dict("in_development", "optional"), +) +async def get_logical_names( + request: Request, + _forbid=Depends(forbidExtraQueryParams()), +) -> list[models.LogicalName]: + return await router.adapter.get_logical_names() + + +@router.get( + "/locations/{resource_id}", + summary="Get resolved storage locations for a resource", + description=( + "Return the resolved storage paths for the authenticated user at the specified " + "resource. Optionally filter by logical name, project/allocation, and intent.\n\n" + "Intent semantics:\n" + "- `staging`: excludes archive (too slow for staging workflows)\n" + "- `long-term-storage`: returns only archive\n" + "- `write`: excludes paths that are read-only in a compute-job context\n" + "- `read`: no filtering\n" + ), + status_code=status.HTTP_200_OK, + response_model=list[models.StorageLocation], + responses=DEFAULT_RESPONSES, + operation_id="getStorageLocations", + openapi_extra=iri_meta_dict("in_development", "optional"), +) +async def get_locations( + resource_id: str, + request: Request, + logicalpath: Annotated[ + models.LogicalName | None, + Query(description="Filter to a specific logical filesystem tier"), + ] = None, + project: Annotated[ + str | None, + Query(description="Project or allocation identifier for project-scoped paths"), + ] = None, + allocation: Annotated[ + str | None, + Query(description="Allocation identifier (alternative to project)"), + ] = None, + intent: Annotated[ + models.StorageIntent | None, + Query(description="Intended use to guide which locations are returned"), + ] = None, + user: User = Depends(router.current_user), + _forbid=Depends(forbidExtraQueryParams("logicalpath", "project", "allocation", "intent")), +) -> list[models.StorageLocation]: + resource = await status_router.adapter.get_resource(resource_id) + if not resource: + raise HTTPException(status_code=404, detail="Resource not found") + locations = await router.adapter.get_locations(resource, user, logicalpath, project, allocation, intent) + if logicalpath and not locations: + raise HTTPException(status_code=404, detail=f"No storage location found for logical name '{logicalpath}'") + return locations + + +@router.get( + "/mounts/{resource_id}", + summary="Get storage mount points for a resource", + description=( + "Return all storage volumes mounted at the specified resource, together with " + "both in-job and outside-job (login / data-transfer node) access permissions.\n\n" + "This answers: for a given resource in a given execution context " + "(inside a running job, or on a login/DTN node), what volumes are mounted " + "and with what read/write/execute permissions?" + ), + status_code=status.HTTP_200_OK, + response_model=list[models.StorageMount], + responses=DEFAULT_RESPONSES, + operation_id="getStorageMounts", + openapi_extra=iri_meta_dict("in_development", "optional"), +) +async def get_mounts( + resource_id: str, + request: Request, + project: Annotated[ + str | None, + Query(description="Project or allocation identifier for project-scoped paths"), + ] = None, + intent: Annotated[ + models.StorageIntent | None, + Query(description="Intended use to filter returned mounts"), + ] = None, + user: User = Depends(router.current_user), + _forbid=Depends(forbidExtraQueryParams("project", "intent")), +) -> list[models.StorageMount]: + resource = await status_router.adapter.get_resource(resource_id) + if not resource: + raise HTTPException(status_code=404, detail="Resource not found") + return await router.adapter.get_mounts(resource, user, project, intent) From ffadd658ed608a4a6cfe66c60c23389db90f3cf4 Mon Sep 17 00:00:00 2001 From: Justas Balcas Date: Thu, 14 May 2026 09:24:48 -0500 Subject: [PATCH 2/2] Simplify storage endpoint --- app/demo_adapter.py | 77 +++++------------- app/routers/storage/facility_adapter.py | 29 +------ app/routers/storage/models.py | 61 +-------------- app/routers/storage/storage.py | 65 ++------------- test/test_storage.py | 100 ++++++++++++++++++++++++ 5 files changed, 131 insertions(+), 201 deletions(-) create mode 100644 test/test_storage.py diff --git a/app/demo_adapter.py b/app/demo_adapter.py index 75faa13a..f0bd2acd 100644 --- a/app/demo_adapter.py +++ b/app/demo_adapter.py @@ -113,7 +113,7 @@ def __init__(self): self.project_allocations = [] self.user_allocations = [] self.facility = {} - self.locations = {} # resource_id -> list[StorageMount] + self.locations = {} # resource_id -> list[StorageInstance templates] self.sites = [] self._init_state() @@ -259,7 +259,7 @@ def _init_state(self): # Perlmutter compute nodes: in-job semantics. Home is read-only inside a job; # archive (HPSS) is not accessible from compute, so it isn't mounted here at all. self.locations[pm.id] = [ - storage_models.StorageMount( + storage_models.StorageInstance( logical_name=storage_models.LogicalName.home, path="/global/homes/{first}/{user}", access=_ro, @@ -270,7 +270,7 @@ def _init_state(self): purge_policy_days=None, shared=False, ), - storage_models.StorageMount( + storage_models.StorageInstance( logical_name=storage_models.LogicalName.scratch, path="/pscratch/sd/{first}/{user}", access=_rw, @@ -281,7 +281,7 @@ def _init_state(self): purge_policy_days=30, shared=False, ), - storage_models.StorageMount( + storage_models.StorageInstance( logical_name=storage_models.LogicalName.project, path="/global/project/projectdirs/{project}/{user}", access=_rw, @@ -292,7 +292,7 @@ def _init_state(self): purge_policy_days=None, shared=True, ), - storage_models.StorageMount( + storage_models.StorageInstance( logical_name=storage_models.LogicalName.campaign, path="/global/cfs/cdirs/{project}/campaign/{user}", access=_rw, @@ -308,7 +308,7 @@ def _init_state(self): # HPSS tape system: archive only; user accesses it through this resource_id # (typically via login nodes or htar). Archive is rw from this resource. self.locations[hpss.id] = [ - storage_models.StorageMount( + storage_models.StorageInstance( logical_name=storage_models.LogicalName.archive, path="/home/{first}/{user}", access=_rw, @@ -324,7 +324,7 @@ def _init_state(self): # CFS / GPFS resource (queried via login nodes / DTN-style endpoint): all tiers rw, # shared is read-only because it's the project-shared landing area. self.locations[cfs.id] = [ - storage_models.StorageMount( + storage_models.StorageInstance( logical_name=storage_models.LogicalName.home, path="/global/homes/{first}/{user}", access=_rw, @@ -335,7 +335,7 @@ def _init_state(self): purge_policy_days=None, shared=False, ), - storage_models.StorageMount( + storage_models.StorageInstance( logical_name=storage_models.LogicalName.scratch, path="/pscratch/sd/{first}/{user}", access=_rw, @@ -346,7 +346,7 @@ def _init_state(self): purge_policy_days=30, shared=False, ), - storage_models.StorageMount( + storage_models.StorageInstance( logical_name=storage_models.LogicalName.project, path="/global/project/projectdirs/{project}/{user}", access=_rw, @@ -357,7 +357,7 @@ def _init_state(self): purge_policy_days=None, shared=True, ), - storage_models.StorageMount( + storage_models.StorageInstance( logical_name=storage_models.LogicalName.campaign, path="/global/cfs/cdirs/{project}/campaign/{user}", access=_rw, @@ -368,7 +368,7 @@ def _init_state(self): purge_policy_days=120, shared=True, ), - storage_models.StorageMount( + storage_models.StorageInstance( logical_name=storage_models.LogicalName.shared, path="/global/cfs/cdirs/{project}/shared", access=_ro, @@ -379,7 +379,7 @@ def _init_state(self): purge_policy_days=None, shared=True, ), - storage_models.StorageMount( + storage_models.StorageInstance( logical_name=storage_models.LogicalName.temporary, path="/tmp/{user}", access=_rw, @@ -804,21 +804,18 @@ def _resolve_path(self, template: str, user: User, project: str | None) -> str: def _apply_intent_filter( self, - mount: storage_models.StorageMount, + instance: storage_models.StorageInstance, intent: storage_models.StorageIntent | None, ) -> bool: - """Return False if this mount should be excluded for the given intent.""" + """Return False if this storage instance should be excluded for the given intent.""" if intent == storage_models.StorageIntent.long_term_storage: - return mount.logical_name == storage_models.LogicalName.archive + return instance.logical_name == storage_models.LogicalName.archive if intent == storage_models.StorageIntent.staging: - return mount.logical_name != storage_models.LogicalName.archive + return instance.logical_name != storage_models.LogicalName.archive if intent == storage_models.StorageIntent.write: - return mount.access.write + return instance.access.write return True - async def get_logical_names(self) -> list[storage_models.LogicalName]: - return list(storage_models.LogicalName) - async def get_locations( self, resource: status_models.Resource, @@ -827,7 +824,7 @@ async def get_locations( project: str | None, allocation: str | None, intent: storage_models.StorageIntent | None, - ) -> list[storage_models.StorageLocation]: + ) -> list[storage_models.StorageInstance]: templates = self.locations.get(resource.id, []) effective_project = project or allocation @@ -849,7 +846,7 @@ async def get_locations( expand_over = project_codes if is_project_scoped else [None] for code in expand_over: - result.append(storage_models.StorageLocation( + result.append(storage_models.StorageInstance( logical_name=m.logical_name, path=self._resolve_path(m.path, user, code), filesystem=m.filesystem, @@ -862,42 +859,6 @@ async def get_locations( )) return result - async def get_mounts( - self, - resource: status_models.Resource, - user: User, - project: str | None, - intent: storage_models.StorageIntent | None, - ) -> list[storage_models.StorageMount]: - templates = self.locations.get(resource.id, []) - - if project and not self._user_member_of(user, project): - raise HTTPException(status_code=403, detail=f"User is not a member of project '{project}'") - - project_codes = [project] if project else self._user_project_codes(user) - - result = [] - for m in templates: - if not self._apply_intent_filter(m, intent): - continue - - is_project_scoped = "{project}" in m.path - expand_over = project_codes if is_project_scoped else [None] - - for code in expand_over: - result.append(storage_models.StorageMount( - logical_name=m.logical_name, - path=self._resolve_path(m.path, user, code), - access=m.access, - filesystem=m.filesystem, - performance_tier=m.performance_tier, - quota_bytes=m.quota_bytes, - available_bytes=m.available_bytes, - purge_policy_days=m.purge_policy_days, - shared=m.shared, - )) - return result - def validate_path(self, path: str, allow_symlinks: bool = True) -> str: """Validate that the given path is within the sandbox base directory and optionally check for symlinks.""" basedir = PathSandbox.get_base_temp_dir() diff --git a/app/routers/storage/facility_adapter.py b/app/routers/storage/facility_adapter.py index ae88fd8e..d650dc58 100644 --- a/app/routers/storage/facility_adapter.py +++ b/app/routers/storage/facility_adapter.py @@ -13,11 +13,6 @@ class FacilityAdapter(AuthenticatedAdapter): (defaults to `app.demo_adapter.DemoAdapter`) to install your implementation. """ - @abstractmethod - async def get_logical_names(self) -> list[storage_models.LogicalName]: - """Return the logical filesystem tier names supported at this facility.""" - pass - @abstractmethod async def get_locations( self, @@ -27,9 +22,11 @@ async def get_locations( project: str | None, allocation: str | None, intent: storage_models.StorageIntent | None, - ) -> list[storage_models.StorageLocation]: + ) -> list[storage_models.StorageInstance]: """ - Return resolved storage paths for the user at the given resource. + Return resolved storage paths for the user at the given resource. The returned + instances also capture the access semantics of that resource context, so callers + do not need a separate mounts endpoint. Results are optionally filtered by logical name, project/allocation, and intent. Intent semantics: @@ -39,21 +36,3 @@ async def get_locations( - read: no filtering (all accessible paths) """ pass - - @abstractmethod - async def get_mounts( - self, - resource: status_models.Resource, - user: User, - project: str | None, - intent: storage_models.StorageIntent | None, - ) -> list[storage_models.StorageMount]: - """ - Return all storage volumes mounted at the resource. The access permissions - in each StorageMount reflect what the user can do *through this resource_id*: - a compute resource shows in-job permissions; a login / DTN / Globus resource - shows what that endpoint can do. Callers select the appropriate resource_id - for the context they need (e.g. compute resource for jobs, Globus collection - for transfers). - """ - pass diff --git a/app/routers/storage/models.py b/app/routers/storage/models.py index 7757195c..4aaa7c81 100644 --- a/app/routers/storage/models.py +++ b/app/routers/storage/models.py @@ -29,10 +29,9 @@ class AccessPermissions(BaseModel): execute: bool = Field(..., description="Execute/traverse permission", example=True) -class StorageLocation(BaseModel): +class StorageInstance(BaseModel): """ - Resolved storage path for a user at a resource, for a given logical filesystem tier. - Answers: given this user/project/intent, where should data live at this facility? + A concrete storage instance visible through a resource for a given logical filesystem tier. """ logical_name: LogicalName = Field( ..., @@ -76,59 +75,5 @@ class StorageLocation(BaseModel): ) access: AccessPermissions = Field( ..., - description="Access permissions at this location", - ) - - -class StorageMount(BaseModel): - """ - A storage volume mounted at a resource. The access permissions reflect what the - user can do *through this resource_id*: a compute resource shows in-job semantics, - a login/DTN/Globus resource shows the permissions available from that endpoint. - Callers query the appropriate resource_id for the context they need. - """ - logical_name: LogicalName = Field( - ..., - description="Logical filesystem tier name", - example="scratch", - ) - path: str = Field( - ..., - description="Absolute mount path visible to the user", - example="/pscratch/sd/j/jbalcas", - ) - access: AccessPermissions = Field( - ..., - description="Access permissions for this volume through this resource_id " - "(compute resource = in-job; login/DTN/Globus resource = outside-job).", - ) - filesystem: str | None = Field( - default=None, - description="Underlying filesystem type or label", - example="lustre-scratch", - ) - performance_tier: str | None = Field( - default=None, - description="Performance tier classification (high / medium / low / tape)", - example="high", - ) - quota_bytes: int | None = Field( - default=None, - description="Total quota in bytes (None = unlimited or unknown)", - example=5000000000000, - ) - available_bytes: int | None = Field( - default=None, - description="Available bytes remaining within the quota", - example=4200000000000, - ) - purge_policy_days: int | None = Field( - default=None, - description="Days of inactivity before automatic purge; None means no purge policy", - example=30, - ) - shared: bool = Field( - default=False, - description="True if the path is shared across multiple users or projects", - example=False, + description="Access permissions through the queried resource context", ) diff --git a/app/routers/storage/storage.py b/app/routers/storage/storage.py index 09ef7c92..cdfc89ac 100644 --- a/app/routers/storage/storage.py +++ b/app/routers/storage/storage.py @@ -18,33 +18,14 @@ ) -@router.get( - "/locations", - summary="List supported logical filesystem names", - description=( - "Return the logical filesystem tier names supported at this facility " - "(e.g. home, scratch, project, archive). Not every facility is required to " - "support all names." - ), - status_code=status.HTTP_200_OK, - response_model=list[models.LogicalName], - responses=DEFAULT_RESPONSES, - operation_id="getStorageLogicalNames", - openapi_extra=iri_meta_dict("in_development", "optional"), -) -async def get_logical_names( - request: Request, - _forbid=Depends(forbidExtraQueryParams()), -) -> list[models.LogicalName]: - return await router.adapter.get_logical_names() - - @router.get( "/locations/{resource_id}", summary="Get resolved storage locations for a resource", description=( "Return the resolved storage paths for the authenticated user at the specified " - "resource. Optionally filter by logical name, project/allocation, and intent.\n\n" + "resource. The response includes both placement and access semantics for that " + "resource context, so it can be used in place of a separate mounts endpoint. " + "Optionally filter by logical name, project/allocation, and intent.\n\n" "Intent semantics:\n" "- `staging`: excludes archive (too slow for staging workflows)\n" "- `long-term-storage`: returns only archive\n" @@ -52,7 +33,7 @@ async def get_logical_names( "- `read`: no filtering\n" ), status_code=status.HTTP_200_OK, - response_model=list[models.StorageLocation], + response_model=list[models.StorageInstance], responses=DEFAULT_RESPONSES, operation_id="getStorageLocations", openapi_extra=iri_meta_dict("in_development", "optional"), @@ -78,7 +59,7 @@ async def get_locations( ] = None, user: User = Depends(router.current_user), _forbid=Depends(forbidExtraQueryParams("logicalpath", "project", "allocation", "intent")), -) -> list[models.StorageLocation]: +) -> list[models.StorageInstance]: resource = await status_router.adapter.get_resource(resource_id) if not resource: raise HTTPException(status_code=404, detail="Resource not found") @@ -86,39 +67,3 @@ async def get_locations( if logicalpath and not locations: raise HTTPException(status_code=404, detail=f"No storage location found for logical name '{logicalpath}'") return locations - - -@router.get( - "/mounts/{resource_id}", - summary="Get storage mount points for a resource", - description=( - "Return all storage volumes mounted at the specified resource, together with " - "both in-job and outside-job (login / data-transfer node) access permissions.\n\n" - "This answers: for a given resource in a given execution context " - "(inside a running job, or on a login/DTN node), what volumes are mounted " - "and with what read/write/execute permissions?" - ), - status_code=status.HTTP_200_OK, - response_model=list[models.StorageMount], - responses=DEFAULT_RESPONSES, - operation_id="getStorageMounts", - openapi_extra=iri_meta_dict("in_development", "optional"), -) -async def get_mounts( - resource_id: str, - request: Request, - project: Annotated[ - str | None, - Query(description="Project or allocation identifier for project-scoped paths"), - ] = None, - intent: Annotated[ - models.StorageIntent | None, - Query(description="Intended use to filter returned mounts"), - ] = None, - user: User = Depends(router.current_user), - _forbid=Depends(forbidExtraQueryParams("project", "intent")), -) -> list[models.StorageMount]: - resource = await status_router.adapter.get_resource(resource_id) - if not resource: - raise HTTPException(status_code=404, detail="Resource not found") - return await router.adapter.get_mounts(resource, user, project, intent) diff --git a/test/test_storage.py b/test/test_storage.py new file mode 100644 index 00000000..cad737e6 --- /dev/null +++ b/test/test_storage.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python3 +"""Focused regression tests for the remaining storage endpoint contract and OpenAPI wiring.""" + +import asyncio +import os +import unittest + +os.environ.setdefault("IRI_SHOW_MISSING_ROUTES", "true") + +from app.demo_adapter import DemoAdapter +from app.main import APP +from app.routers.storage import models as storage_models + + +class StorageEndpointTests(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.adapter = DemoAdapter() + cls.user = cls.adapter.user + cls.openapi = APP.openapi() + + @classmethod + def _resource(cls, group: str, name: str): + for resource in cls.adapter.resources: + if resource.group == group and resource.name == name: + return resource + raise AssertionError(f"Unable to find resource {group}/{name}") + + def test_resolved_locations_return_shared_storage_instance_shape(self): + compute_resource = self._resource("perlmutter", "compute nodes") + + payload = asyncio.run( + self.adapter.get_locations( + compute_resource, + self.user, + None, + None, + None, + None, + ) + ) + + self.assertGreater(len(payload), 0) + self.assertTrue(all(isinstance(item, storage_models.StorageInstance) for item in payload)) + + first = payload[0].model_dump() + self.assertEqual( + set(first.keys()), + { + "logical_name", + "path", + "filesystem", + "performance_tier", + "quota_bytes", + "available_bytes", + "purge_policy_days", + "shared", + "access", + }, + ) + home_entries = [entry for entry in payload if entry.logical_name == storage_models.LogicalName.home] + self.assertEqual(len(home_entries), 1) + self.assertFalse(home_entries[0].access.write) + + def test_project_scoped_entries_expand_under_remaining_locations_endpoint(self): + login_resource = self._resource("perlmutter", "login nodes") + + location_payload = asyncio.run( + self.adapter.get_locations( + login_resource, + self.user, + storage_models.LogicalName.shared, + None, + None, + None, + ) + ) + + self.assertGreater(len(location_payload), 0) + self.assertTrue( + all(entry.logical_name == storage_models.LogicalName.shared for entry in location_payload) + ) + self.assertTrue( + all(not entry.access.write for entry in location_payload) + ) + self.assertEqual(len(location_payload), len(self.adapter._user_project_codes(self.user))) + + def test_openapi_exposes_only_resource_scoped_storage_locations(self): + resolved_locations = self.openapi["paths"]["/api/v1/storage/locations/{resource_id}"]["get"] + self.assertNotIn("/api/v1/storage/locations", self.openapi["paths"]) + self.assertNotIn("/api/v1/storage/mounts/{resource_id}", self.openapi["paths"]) + self.assertTrue( + resolved_locations["responses"]["200"]["content"]["application/json"]["schema"]["items"]["$ref"].endswith( + "/StorageInstance" + ) + ) + + +if __name__ == "__main__": + unittest.main()