Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ dev: deps
IRI_API_ADAPTER_account=app.demo_adapter.DemoAdapter \
IRI_API_ADAPTER_compute=app.demo_adapter.DemoAdapter \
IRI_API_ADAPTER_filesystem=app.demo_adapter.DemoAdapter \
IRI_API_ADAPTER_storage=app.demo_adapter.DemoAdapter \
IRI_API_ADAPTER_task=app.demo_adapter.DemoAdapter \
IRI_LOG_FILE="$${IRI_LOG_FILE:-$${LOG_FILE:-$(IRI_LOG_FILE)}}" \
IRI_LOG_ROTATION_DAYS="$${IRI_LOG_ROTATION_DAYS:-$${LOG_ROTATION_DAYS:-$(IRI_LOG_ROTATION_DAYS)}}" \
Expand Down
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,21 @@ Links to data, created by this api, will concatenate these values producing link

- `IRI_API_PARAMS`: as described above, this is a way to customize the API meta-data
- `IRI_API_ADAPTER_*`: these values specify the business logic for the per-api-group implementation of a facility_adapter. For example: `IRI_API_ADAPTER_status=myfacility.MyFacilityStatusAdapter` would load the implementation of the `app.routers.status.facility_adapter.FacilityAdapter` abstract class to handle the `status` business logic for your facility.

The full list of router adapters and the abstract base class each must implement:

| Variable | Mounted at | Abstract base class your adapter must subclass |
|---|---|---|
| `IRI_API_ADAPTER_facility` | `/facility/...` | [`app.routers.facility.facility_adapter.FacilityAdapter`](app/routers/facility/facility_adapter.py) |
| `IRI_API_ADAPTER_status` | `/status/...` | [`app.routers.status.facility_adapter.FacilityAdapter`](app/routers/status/facility_adapter.py) |
| `IRI_API_ADAPTER_account` | `/account/...` | [`app.routers.account.facility_adapter.FacilityAdapter`](app/routers/account/facility_adapter.py) |
| `IRI_API_ADAPTER_compute` | `/compute/...` | [`app.routers.compute.facility_adapter.FacilityAdapter`](app/routers/compute/facility_adapter.py) |
| `IRI_API_ADAPTER_filesystem` | `/filesystem/...` | [`app.routers.filesystem.facility_adapter.FacilityAdapter`](app/routers/filesystem/facility_adapter.py) |
| `IRI_API_ADAPTER_storage` | `/storage/...` | [`app.routers.storage.facility_adapter.FacilityAdapter`](app/routers/storage/facility_adapter.py) |
| `IRI_API_ADAPTER_task` | `/task/...` | [`app.routers.task.facility_adapter.FacilityAdapter`](app/routers/task/facility_adapter.py) |

Each value is a `module.path.ClassName` string. `app.demo_adapter.DemoAdapter` implements all of them and is what `make dev` wires up by default. A router whose `IRI_API_ADAPTER_*` is not set is hidden from the API at startup unless `IRI_SHOW_MISSING_ROUTES=true`.

- `IRI_SHOW_MISSING_ROUTES`: hide api groups that don't have an `IRI_API_ADAPTER_*` environment variable defined, if set to `true`. This way if your facility only wishes to expose some api groups but not others, they can be hidden. (Defaults to `false`.)

### Logging
Expand Down
240 changes: 238 additions & 2 deletions app/demo_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
from .routers.facility import models as facility_models
from .routers.filesystem import facility_adapter as filesystem_adapter
from .routers.filesystem import models as filesystem_models
from .routers.storage import facility_adapter as storage_adapter
from .routers.storage import models as storage_models
from .routers.status import facility_adapter as status_adapter
from .routers.status import models as status_models
from .routers.task import facility_adapter as task_adapter
Expand Down Expand Up @@ -96,7 +98,9 @@ def utc_timestamp() -> int:


class DemoAdapter(
status_adapter.FacilityAdapter, account_adapter.FacilityAdapter, compute_adapter.FacilityAdapter, filesystem_adapter.FacilityAdapter, task_adapter.FacilityAdapter, facility_adapter.FacilityAdapter
status_adapter.FacilityAdapter, account_adapter.FacilityAdapter, compute_adapter.FacilityAdapter,
filesystem_adapter.FacilityAdapter, storage_adapter.FacilityAdapter,
task_adapter.FacilityAdapter, facility_adapter.FacilityAdapter
):
"""A demo implementation of the FacilityAdapter that returns hardcoded data."""
def __init__(self):
Expand All @@ -109,7 +113,7 @@ def __init__(self):
self.project_allocations = []
self.user_allocations = []
self.facility = {}
self.locations = []
self.locations = {} # resource_id -> list[StorageInstance templates]
self.sites = []
self._init_state()

Expand Down Expand Up @@ -243,6 +247,154 @@ def _init_state(self):

self.resources = [pm, hpss, cfs, login, iris, sfapi]

_rw = storage_models.AccessPermissions(read=True, write=True, execute=True)
_ro = storage_models.AccessPermissions(read=True, write=False, execute=True)

# Paths use {user}, {first} (first letter of username), and {project} as placeholders.
# Project-scoped entries (containing {project}) are expanded per-project at query time.
# Each resource_id carries the access semantics for its own context — a compute
# resource shows in-job permissions, a login/DTN/Globus resource shows what that
# endpoint can do. There is no separate access_outside_of_job field.

# Perlmutter compute nodes: in-job semantics. Home is read-only inside a job;
# archive (HPSS) is not accessible from compute, so it isn't mounted here at all.
self.locations[pm.id] = [
storage_models.StorageInstance(
logical_name=storage_models.LogicalName.home,
path="/global/homes/{first}/{user}",
access=_ro,
filesystem="gpfs-homes",
performance_tier="medium",
quota_bytes=40 * 1024**3,
available_bytes=28 * 1024**3,
purge_policy_days=None,
shared=False,
),
storage_models.StorageInstance(
logical_name=storage_models.LogicalName.scratch,
path="/pscratch/sd/{first}/{user}",
access=_rw,
filesystem="lustre-scratch",
performance_tier="high",
quota_bytes=20 * 1024**4,
available_bytes=14 * 1024**4,
purge_policy_days=30,
shared=False,
),
storage_models.StorageInstance(
logical_name=storage_models.LogicalName.project,
path="/global/project/projectdirs/{project}/{user}",
access=_rw,
filesystem="gpfs-project",
performance_tier="medium",
quota_bytes=2 * 1024**4,
available_bytes=1024**4,
purge_policy_days=None,
shared=True,
),
storage_models.StorageInstance(
logical_name=storage_models.LogicalName.campaign,
path="/global/cfs/cdirs/{project}/campaign/{user}",
access=_rw,
filesystem="gpfs-cfs",
performance_tier="medium",
quota_bytes=10 * 1024**4,
available_bytes=8 * 1024**4,
purge_policy_days=120,
shared=True,
),
]

# HPSS tape system: archive only; user accesses it through this resource_id
# (typically via login nodes or htar). Archive is rw from this resource.
self.locations[hpss.id] = [
storage_models.StorageInstance(
logical_name=storage_models.LogicalName.archive,
path="/home/{first}/{user}",
access=_rw,
filesystem="hpss",
performance_tier="tape",
quota_bytes=None,
available_bytes=None,
purge_policy_days=None,
shared=False,
),
]

# CFS / GPFS resource (queried via login nodes / DTN-style endpoint): all tiers rw,
# shared is read-only because it's the project-shared landing area.
self.locations[cfs.id] = [
storage_models.StorageInstance(
logical_name=storage_models.LogicalName.home,
path="/global/homes/{first}/{user}",
access=_rw,
filesystem="gpfs-homes",
performance_tier="medium",
quota_bytes=40 * 1024**3,
available_bytes=28 * 1024**3,
purge_policy_days=None,
shared=False,
),
storage_models.StorageInstance(
logical_name=storage_models.LogicalName.scratch,
path="/pscratch/sd/{first}/{user}",
access=_rw,
filesystem="lustre-scratch",
performance_tier="high",
quota_bytes=20 * 1024**4,
available_bytes=14 * 1024**4,
purge_policy_days=30,
shared=False,
),
storage_models.StorageInstance(
logical_name=storage_models.LogicalName.project,
path="/global/project/projectdirs/{project}/{user}",
access=_rw,
filesystem="gpfs-project",
performance_tier="medium",
quota_bytes=2 * 1024**4,
available_bytes=1024**4,
purge_policy_days=None,
shared=True,
),
storage_models.StorageInstance(
logical_name=storage_models.LogicalName.campaign,
path="/global/cfs/cdirs/{project}/campaign/{user}",
access=_rw,
filesystem="gpfs-cfs",
performance_tier="medium",
quota_bytes=10 * 1024**4,
available_bytes=8 * 1024**4,
purge_policy_days=120,
shared=True,
),
storage_models.StorageInstance(
logical_name=storage_models.LogicalName.shared,
path="/global/cfs/cdirs/{project}/shared",
access=_ro,
filesystem="gpfs-cfs",
performance_tier="medium",
quota_bytes=None,
available_bytes=None,
purge_policy_days=None,
shared=True,
),
storage_models.StorageInstance(
logical_name=storage_models.LogicalName.temporary,
path="/tmp/{user}",
access=_rw,
filesystem="tmpfs",
performance_tier="high",
quota_bytes=512 * 1024**3,
available_bytes=480 * 1024**3,
purge_policy_days=7,
shared=False,
),
]

# Login nodes: same filesystem layout as CFS — outside-of-job semantics for everything.
self.locations[login.id] = self.locations[cfs.id]

# Populate site resource_ids based on which resources are at each site
site1.resource_ids = [r.id for r in self.resources if r.site_id == site1.id]
site2.resource_ids = [r.id for r in self.resources if r.site_id == site2.id]
Expand Down Expand Up @@ -623,6 +775,90 @@ async def cancel_job(
# call slurm/etc. to cancel job
return True

# ----------------------------------------------
# Storage API
# ----------------------------------------------

@staticmethod
def _slugify_project(name: str) -> str:
"""Convert a project name to a path-safe slug (real facilities use codes like 'm1234')."""
return name.lower().replace(" ", "_")

def _user_project_codes(self, user: User) -> list[str]:
"""Return the path-slug codes of all projects the user belongs to."""
return [self._slugify_project(p.name) for p in self.projects if user.id in p.user_ids]

def _user_member_of(self, user: User, project_code: str) -> bool:
"""Authorization check: is the user a member of the named project?"""
return any(
user.id in p.user_ids and self._slugify_project(p.name) == project_code
for p in self.projects
)

def _resolve_path(self, template: str, user: User, project: str | None) -> str:
first = user.id[0] if user.id else "u"
path = template.replace("{user}", user.id).replace("{first}", first)
if project:
path = path.replace("{project}", project)
return path

def _apply_intent_filter(
self,
instance: storage_models.StorageInstance,
intent: storage_models.StorageIntent | None,
) -> bool:
"""Return False if this storage instance should be excluded for the given intent."""
if intent == storage_models.StorageIntent.long_term_storage:
return instance.logical_name == storage_models.LogicalName.archive
if intent == storage_models.StorageIntent.staging:
return instance.logical_name != storage_models.LogicalName.archive
if intent == storage_models.StorageIntent.write:
return instance.access.write
return True

async def get_locations(
self,
resource: status_models.Resource,
user: User,
logicalpath: storage_models.LogicalName | None,
project: str | None,
allocation: str | None,
intent: storage_models.StorageIntent | None,
) -> list[storage_models.StorageInstance]:
templates = self.locations.get(resource.id, [])
effective_project = project or allocation

# Authorization: a user can only resolve paths for their own projects
if effective_project and not self._user_member_of(user, effective_project):
raise HTTPException(status_code=403, detail=f"User is not a member of project '{effective_project}'")

# Expand project-scoped paths across ALL of the user's projects when none specified
project_codes = [effective_project] if effective_project else self._user_project_codes(user)

result = []
for m in templates:
if logicalpath and m.logical_name != logicalpath:
continue
if not self._apply_intent_filter(m, intent):
continue

is_project_scoped = "{project}" in m.path
expand_over = project_codes if is_project_scoped else [None]

for code in expand_over:
result.append(storage_models.StorageInstance(
logical_name=m.logical_name,
path=self._resolve_path(m.path, user, code),
filesystem=m.filesystem,
performance_tier=m.performance_tier,
quota_bytes=m.quota_bytes,
available_bytes=m.available_bytes,
purge_policy_days=m.purge_policy_days,
shared=m.shared,
access=m.access,
))
return result

def validate_path(self, path: str, allow_symlinks: bool = True) -> str:
"""Validate that the given path is within the sandbox base directory and optionally check for symlinks."""
basedir = PathSandbox.get_base_temp_dir()
Expand Down
2 changes: 2 additions & 0 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from app.routers.account import account
from app.routers.compute import compute
from app.routers.filesystem import filesystem
from app.routers.storage import storage
from app.routers.task import task

configure_logging(config.LOG_LEVEL)
Expand Down Expand Up @@ -81,6 +82,7 @@ async def dispatch(self, request: Request, call_next):
APP.include_router(account.router, prefix=api_prefix)
APP.include_router(compute.router, prefix=api_prefix)
APP.include_router(filesystem.router, prefix=api_prefix)
APP.include_router(storage.router, prefix=api_prefix)
APP.include_router(task.router, prefix=api_prefix)

logging.getLogger().info(f"API path: {api_prefix}")
Empty file added app/routers/storage/__init__.py
Empty file.
38 changes: 38 additions & 0 deletions app/routers/storage/facility_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from abc import abstractmethod

from ...types.user import User
from ..status import models as status_models
from . import models as storage_models
from ..iri_router import AuthenticatedAdapter


class FacilityAdapter(AuthenticatedAdapter):
"""
Facility-specific storage location adapter.
Use the `IRI_API_ADAPTER_storage` environment variable
(defaults to `app.demo_adapter.DemoAdapter`) to install your implementation.
"""

@abstractmethod
async def get_locations(
self,
resource: status_models.Resource,
user: User,
logicalpath: storage_models.LogicalName | None,
project: str | None,
allocation: str | None,
intent: storage_models.StorageIntent | None,
) -> list[storage_models.StorageInstance]:
"""
Return resolved storage paths for the user at the given resource. The returned
instances also capture the access semantics of that resource context, so callers
do not need a separate mounts endpoint.
Results are optionally filtered by logical name, project/allocation, and intent.

Intent semantics:
- staging: exclude archive (too slow for staging workflows)
- long-term-storage: return only archive
- write: exclude paths that are read-only in a job context
- read: no filtering (all accessible paths)
"""
pass
Loading
Loading