From 3fe73e62a5ece7d6484a88c1221c70800938b44f Mon Sep 17 00:00:00 2001 From: Justas Balcas Date: Wed, 13 May 2026 12:47:16 -0500 Subject: [PATCH 1/2] URN Implementation --- app/demo_adapter.py | 7 +- app/routers/account/models.py | 4 +- app/routers/filesystem/models.py | 16 +- app/routers/status/facility_adapter.py | 2 +- app/routers/status/models.py | 20 +-- app/routers/status/status.py | 12 +- app/types/models.py | 4 +- app/types/scalars.py | 200 ++++++++++++++++++++++++- 8 files changed, 218 insertions(+), 47 deletions(-) diff --git a/app/demo_adapter.py b/app/demo_adapter.py index fa9a23ae..43bd1c16 100644 --- a/app/demo_adapter.py +++ b/app/demo_adapter.py @@ -15,7 +15,6 @@ import uuid from fastapi import HTTPException -from fastapi.encoders import jsonable_encoder from pydantic import BaseModel from .routers.account import facility_adapter as account_adapter @@ -365,8 +364,8 @@ async def list_sites( sites = [s for s in sites if s.last_modified > ms] o = offset or 0 - l = limit or len(sites) - return sites[o : o + l] + page_limit = limit or len(sites) + return sites[o : o + page_limit] async def get_site(self: "DemoAdapter", site_id: str, modified_since: str | None = None) -> facility_models.Site: site = next((s for s in self.sites if s.id == site_id), None) @@ -392,7 +391,7 @@ async def get_resources( description: str | None = None, group: str | None = None, modified_since: datetime.datetime | None = None, - resource_type: status_models.ResourceType | None = None, + resource_type: status_models.ResourceTypeValue | None = None, current_status: status_models.Status | None = None, capability: Capability | None = None, site_id: str | None = None, diff --git a/app/routers/account/models.py b/app/routers/account/models.py index e4c6d4f7..f8e80f92 100644 --- a/app/routers/account/models.py +++ b/app/routers/account/models.py @@ -4,7 +4,7 @@ from ...request_context import get_url_prefix from ...types.base import IRIBaseModel -from ...types.scalars import AllocationUnit +from ...types.scalars import AllocationUnit, AllocationUnitValue class Project(IRIBaseModel): @@ -34,7 +34,7 @@ class AllocationEntry(IRIBaseModel): allocation: float = Field(..., description="Total allocation amount granted.", example=100000.0) # how much this allocation can spend usage: float = Field(..., description="Amount of allocation consumed.", example=52342.5) # how much this allocation has spent - unit: AllocationUnit = Field(..., description="Unit of the allocation (e.g., node_hours, bytes).", example="node_hours") + unit: AllocationUnitValue = Field(..., description="DOE IRI URN for the allocation unit.", example=AllocationUnit.node_hours) class ProjectAllocation(IRIBaseModel): diff --git a/app/routers/filesystem/models.py b/app/routers/filesystem/models.py index fc36ebca..79d5c271 100644 --- a/app/routers/filesystem/models.py +++ b/app/routers/filesystem/models.py @@ -9,13 +9,7 @@ from enum import Enum from pydantic import Field, AliasChoices, BaseModel - -class CompressionType(str, Enum): - """Defines the type of compression to be used for compressing or extracting files.""" - none = "none" - bzip2 = "bzip2" - gzip = "gzip" - xz = "xz" +from ...types.scalars import CompressionType, CompressionTypeValue class ContentUnit(str, Enum): @@ -202,7 +196,7 @@ class PostCompressRequest(FilesystemRequestBase): target_path: str = Field(..., description="Path to the compressed file", example="/home/user/file.tar.gz") match_pattern: str|None = Field(default=None, description="Regex pattern to filter files to compress", example=".*\\.txt$") dereference: bool = Field(default=False, description="If set to `true`, it follows symbolic links and archive the files they point to instead of the links themselves.", example=True) - compression: CompressionType = Field(default="gzip", description="Defines the type of compression to be used. By default gzip is used.", example="gzip") + compression: CompressionTypeValue = Field(default=CompressionType.gzip, description="DOE IRI URN for the compression type. Legacy short tokens are accepted only as input compatibility aliases and are normalized.", example=CompressionType.gzip) model_config = { "json_schema_extra": { "examples": [ @@ -211,7 +205,7 @@ class PostCompressRequest(FilesystemRequestBase): "target_path": "/home/user/file.tar.gz", "match_pattern": "*./[ab].*\\.txt", "dereference": "true", - "compression": "none", + "compression": CompressionType.none, } ] } @@ -226,14 +220,14 @@ class PostExtractResponse(BaseModel): class PostExtractRequest(FilesystemRequestBase): """Represents a request to extract a compressed file.""" target_path: str = Field(..., description="Path to the directory where to extract the compressed file", example="/home/user/dir") - compression: CompressionType = Field(default="gzip", description="Defines the type of compression to be used. By default gzip is used.", example="gzip") + compression: CompressionTypeValue = Field(default=CompressionType.gzip, description="DOE IRI URN for the compression type. Legacy short tokens are accepted only as input compatibility aliases and are normalized.", example=CompressionType.gzip) model_config = { "json_schema_extra": { "examples": [ { "source_path": "/home/user/dir/file.tar.gz", "target_path": "/home/user/dir", - "compression": "none", + "compression": CompressionType.none, } ] } diff --git a/app/routers/status/facility_adapter.py b/app/routers/status/facility_adapter.py index 65b87c4c..29475850 100644 --- a/app/routers/status/facility_adapter.py +++ b/app/routers/status/facility_adapter.py @@ -21,7 +21,7 @@ async def get_resources( description: str | None = None, group: str | None = None, modified_since: datetime.datetime | None = None, - resource_type: status_models.ResourceType|None = None, + resource_type: status_models.ResourceTypeValue | None = None, current_status: status_models.Status|None = None, capability: Capability | None = None, site_id: str | None = None, diff --git a/app/routers/status/models.py b/app/routers/status/models.py index 4fda8015..38e986af 100644 --- a/app/routers/status/models.py +++ b/app/routers/status/models.py @@ -6,6 +6,7 @@ from ...request_context import get_url_prefix from ...types.base import NamedObject +from ...types.scalars import ResourceType, ResourceTypeValue, canonicalize_resource_type, urn_has_complete_prefix class Status(enum.Enum): @@ -15,18 +16,6 @@ class Status(enum.Enum): degraded = "degraded" unknown = "unknown" - -class ResourceType(enum.Enum): - """Represents the type of a resource.""" - website = "website" - service = "service" - compute = "compute" - system = "system" - storage = "storage" - network = "network" - unknown = "unknown" - - class Resource(NamedObject): """Represents a resource in the system.""" def _self_path(self) -> str: @@ -37,7 +26,7 @@ def _self_path(self) -> str: capability_ids: list[str] = Field(default_factory=list, exclude=True) group: str|None = Field(default=None, description="Logical grouping of the resource", example="frontend") current_status: Status|None = Field(default=None, description="The current status comes from the status of the last event for this resource", example="up") - resource_type: ResourceType = Field(..., description="Type of the resource", example="service") + resource_type: ResourceTypeValue = Field(..., description="DOE IRI URN for the resource type", example=ResourceType.service) @computed_field(description="URI of the site where this resource is located") @property @@ -57,9 +46,8 @@ def find(cls, items, name=None, description=None, modified_since=None, group=Non if group: items = [item for item in items if item.group == group] if resource_type: - if isinstance(resource_type, str): - resource_type = ResourceType(resource_type) - items = [item for item in items if item.resource_type == resource_type] + resource_type = canonicalize_resource_type(resource_type) + items = [item for item in items if urn_has_complete_prefix(resource_type, item.resource_type)] if current_status: items = [item for item in items if item.current_status == current_status] if capability: diff --git a/app/routers/status/status.py b/app/routers/status/status.py index ce5958e4..7da51f43 100644 --- a/app/routers/status/status.py +++ b/app/routers/status/status.py @@ -3,7 +3,7 @@ from fastapi import Depends, HTTPException, Query, Request from ...types.http import forbidExtraQueryParams -from ...types.scalars import AllocationUnit, StrictDateTime +from ...types.scalars import AllocationUnitValue, StrictDateTime, doe_iri_domain_urn_min_length, doe_iri_domain_urn_schema_pattern from .. import iri_router from ..error_handlers import DEFAULT_RESPONSES from ..iri_meta import iri_meta_dict @@ -33,9 +33,15 @@ async def get_resources( offset: int = Query(default=0, ge=0), limit: int = Query(default=100, ge=0, le=1000), modified_since: StrictDateTime = Query(default=None), - resource_type: models.ResourceType = Query(default=None), + resource_type: models.ResourceTypeValue = Query( + default=None, + min_length=doe_iri_domain_urn_min_length("resource"), + pattern=doe_iri_domain_urn_schema_pattern("resource"), + description="DOE IRI resource type URN. Legacy short tokens are accepted only as input compatibility aliases and are normalized to canonical URNs.", + examples=[models.ResourceType.compute, models.ResourceType.storage], + ), current_status: models.Status = Query(default=None), - capability: List[AllocationUnit] = Query(default=None, min_length=1), + capability: List[AllocationUnitValue] = Query(default=None, min_length=1), _forbid=Depends(forbidExtraQueryParams("name", "description", "group", "offset", "limit", "modified_since", "resource_type", "current_status", "capability", multiParams={"capability"})), ) -> list[models.Resource]: return await router.adapter.get_resources( diff --git a/app/types/models.py b/app/types/models.py index 113e5dc5..67bf36bd 100644 --- a/app/types/models.py +++ b/app/types/models.py @@ -3,7 +3,7 @@ from pydantic import Field from .base import NamedObject -from .scalars import AllocationUnit, StrictDateTime +from .scalars import AllocationUnit, AllocationUnitValue, StrictDateTime class Capability(NamedObject): @@ -20,4 +20,4 @@ def _self_path(self) -> str: last_modified: StrictDateTime|None = Field(default=None, description="ISO 8601 timestamp when this object was last modified.", example="2026-02-21T12:00:00Z") - units: list[AllocationUnit] = Field(..., description="Allocation units supported by this capability", example=["node_hours"]) + units: list[AllocationUnitValue] = Field(..., description="Allocation units supported by this capability", example=[AllocationUnit.node_hours]) diff --git a/app/types/scalars.py b/app/types/scalars.py index 365be066..ecca3cc0 100644 --- a/app/types/scalars.py +++ b/app/types/scalars.py @@ -2,8 +2,10 @@ # pylint: disable=unused-argument import datetime -import enum +import re +from typing import Annotated +from pydantic import BeforeValidator, WithJsonSchema from pydantic_core import core_schema @@ -82,11 +84,193 @@ def __get_pydantic_json_schema__(cls, schema, handler): return {"type": "string", "format": "date-time", "description": "Strict ISO8601 datetime. Only valid ISO8601 datetime strings are accepted.", "example": "2026-02-21T12:00:00Z"} -# ----------------------------------------------------------------------- -# AllocationUnit: an enum for allocation units -class AllocationUnit(enum.Enum): - """Units for allocation""" +DOE_IRI_URN_PREFIX = "urn:doe-iri:" +_DOMAIN = r"[A-Za-z0-9][A-Za-z0-9-]{0,31}" +_SEGMENT_CHAR = r"(?:[A-Za-z0-9._~-]|%[0-9A-Fa-f]{2}|[!$&'()*+,;=@]|/)" +_DOMAIN_SPECIFIC_SEGMENT = rf"{_SEGMENT_CHAR}+" +_DOMAIN_SPECIFIC_STRING = rf"{_DOMAIN_SPECIFIC_SEGMENT}(?::{_DOMAIN_SPECIFIC_SEGMENT})*" +DOE_IRI_URN_SCHEMA_PATTERN = rf"^{DOE_IRI_URN_PREFIX}{_DOMAIN}:{_DOMAIN_SPECIFIC_STRING}$" +DOE_IRI_URN_PATTERN = re.compile(rf"^{DOE_IRI_URN_PREFIX}(?P{_DOMAIN}):(?P{_DOMAIN_SPECIFIC_STRING})$") + +CANONICAL_RESOURCE_TYPES = { + "website": "urn:doe-iri:resource:website", + "service": "urn:doe-iri:resource:service", + "compute": "urn:doe-iri:resource:compute", + "system": "urn:doe-iri:resource:system", + "storage": "urn:doe-iri:resource:storage", + "network": "urn:doe-iri:resource:network", + "unknown": "urn:doe-iri:resource:unknown", +} + +CANONICAL_ALLOCATION_UNITS = { + "node-hours": "urn:doe-iri:allocation:compute:node-hours", + "bytes": "urn:doe-iri:allocation:storage:bytes", + "inodes": "urn:doe-iri:allocation:storage:inodes", +} + +CANONICAL_COMPRESSION_TYPES = { + "none": "urn:doe-iri:compression:none", + "bzip2": "urn:doe-iri:compression:bzip2", + "gzip": "urn:doe-iri:compression:gzip", + "xz": "urn:doe-iri:compression:xz", +} + +LEGACY_RESOURCE_TYPE_MAP = { + **CANONICAL_RESOURCE_TYPES, +} + +LEGACY_ALLOCATION_UNIT_MAP = { + **CANONICAL_ALLOCATION_UNITS, + "node_hours": CANONICAL_ALLOCATION_UNITS["node-hours"], +} + +LEGACY_COMPRESSION_TYPE_MAP = { + **CANONICAL_COMPRESSION_TYPES, +} + + +def _ensure_text(value, label: str) -> str: + if isinstance(value, str): + candidate = value.strip() + if candidate: + return candidate + raise ValueError(f"Invalid {label}. Expected a non-empty string.") + + +def validate_doe_iri_urn(value: str) -> str: + """Validate a DOE IRI URN string.""" + candidate = _ensure_text(value, "DOE IRI URN") + if not DOE_IRI_URN_PATTERN.fullmatch(candidate): + raise ValueError("Invalid DOE IRI URN. Expected format urn:doe-iri::.") + return candidate + + +def doe_iri_domain_urn_schema_pattern(domain: str) -> str: + """Return the JSON schema pattern for DOE IRI URNs in one domain.""" + return rf"^{DOE_IRI_URN_PREFIX}{domain}:{_DOMAIN_SPECIFIC_STRING}$" + + +def doe_iri_domain_urn_min_length(domain: str) -> int: + """Return the minimum length for DOE IRI URNs in one domain.""" + return len(f"{DOE_IRI_URN_PREFIX}{domain}:") + 1 + + +def _domain_urn_schema(domain: str, description: str, examples: list[str]) -> dict[str, object]: + return { + "type": "string", + "minLength": doe_iri_domain_urn_min_length(domain), + "pattern": doe_iri_domain_urn_schema_pattern(domain), + "description": description, + "examples": examples, + } + + +def _get_doe_iri_domain(value: str) -> str: + return validate_doe_iri_urn(value).split(":", 3)[2] + + +def urn_has_complete_prefix(parent_urn: str, candidate_urn: str) -> bool: + """Return True when parent_urn is an exact or parent segment match of candidate_urn.""" + parent_segments = validate_doe_iri_urn(parent_urn).split(":") + candidate_segments = validate_doe_iri_urn(candidate_urn).split(":") + if len(parent_segments) > len(candidate_segments): + return False + return candidate_segments[: len(parent_segments)] == parent_segments + + +def _coerce_domain_urn(value: str, domain: str, legacy_map: dict[str, str], label: str) -> str: + candidate = _ensure_text(value, label) + if not candidate.startswith("urn:"): + mapped = legacy_map.get(candidate) + if mapped: + return mapped + raise ValueError(f"Invalid {label}. Expected a DOE IRI URN or one of: {', '.join(sorted(legacy_map))}.") + + urn = validate_doe_iri_urn(candidate) + urn_domain = _get_doe_iri_domain(urn) + if urn_domain != domain: + raise ValueError(f"Invalid {label}. Expected DOE IRI URN domain '{domain}', got '{urn_domain}'.") + return urn + + +def canonicalize_resource_type(value: str) -> str: + """Return the canonical DOE IRI resource type URN.""" + return _coerce_domain_urn(value, "resource", LEGACY_RESOURCE_TYPE_MAP, "resource type") + + +def canonicalize_allocation_unit(value: str) -> str: + """Return the canonical DOE IRI allocation-unit URN.""" + return _coerce_domain_urn(value, "allocation", LEGACY_ALLOCATION_UNIT_MAP, "allocation unit") + + +def canonicalize_compression_type(value: str) -> str: + """Return the canonical DOE IRI compression URN.""" + return _coerce_domain_urn(value, "compression", LEGACY_COMPRESSION_TYPE_MAP, "compression type") + + +class ResourceType: + """Canonical DOE IRI resource type URNs.""" + + website = CANONICAL_RESOURCE_TYPES["website"] + service = CANONICAL_RESOURCE_TYPES["service"] + compute = CANONICAL_RESOURCE_TYPES["compute"] + system = CANONICAL_RESOURCE_TYPES["system"] + storage = CANONICAL_RESOURCE_TYPES["storage"] + network = CANONICAL_RESOURCE_TYPES["network"] + unknown = CANONICAL_RESOURCE_TYPES["unknown"] + + +ResourceTypeValue = Annotated[ + str, + BeforeValidator(canonicalize_resource_type), + WithJsonSchema( + _domain_urn_schema( + "resource", + "DOE IRI resource type URN. Legacy short tokens are accepted only as input compatibility aliases and are normalized to canonical URNs.", + [ResourceType.compute, ResourceType.storage], + ) + ), +] + + +class AllocationUnit: + """Canonical DOE IRI allocation-unit URNs.""" + + node_hours = CANONICAL_ALLOCATION_UNITS["node-hours"] + bytes = CANONICAL_ALLOCATION_UNITS["bytes"] + inodes = CANONICAL_ALLOCATION_UNITS["inodes"] + + +AllocationUnitValue = Annotated[ + str, + BeforeValidator(canonicalize_allocation_unit), + WithJsonSchema( + _domain_urn_schema( + "allocation", + "DOE IRI allocation-unit URN. Legacy short tokens are accepted only as input compatibility aliases and are normalized to canonical URNs.", + [AllocationUnit.node_hours, AllocationUnit.bytes], + ) + ), +] + + +class CompressionType: + """Canonical DOE IRI compression URNs.""" + + none = CANONICAL_COMPRESSION_TYPES["none"] + bzip2 = CANONICAL_COMPRESSION_TYPES["bzip2"] + gzip = CANONICAL_COMPRESSION_TYPES["gzip"] + xz = CANONICAL_COMPRESSION_TYPES["xz"] + - node_hours = "node_hours" - bytes = "bytes" - inodes = "inodes" +CompressionTypeValue = Annotated[ + str, + BeforeValidator(canonicalize_compression_type), + WithJsonSchema( + _domain_urn_schema( + "compression", + "DOE IRI compression URN. Legacy short tokens are accepted only as input compatibility aliases and are normalized to canonical URNs.", + [CompressionType.gzip, CompressionType.none], + ) + ), +] From 2594c229d87e1f65abff14f1e4df4bfdfd2ee47b Mon Sep 17 00:00:00 2001 From: Justas Balcas Date: Tue, 19 May 2026 06:30:37 -0500 Subject: [PATCH 2/2] Add urn test file --- test/test_urn_types.py | 143 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 143 insertions(+) create mode 100644 test/test_urn_types.py diff --git a/test/test_urn_types.py b/test/test_urn_types.py new file mode 100644 index 00000000..42dca853 --- /dev/null +++ b/test/test_urn_types.py @@ -0,0 +1,143 @@ +#!/usr/bin/env python3 +"""Focused DOE IRI URN regression tests.""" + +import unittest + +from pydantic import TypeAdapter + +from app.routers.filesystem import models as filesystem_models +from app.routers.status import models as status_models +from app.types.scalars import ( + AllocationUnit, + AllocationUnitValue, + CompressionType, + CompressionTypeValue, + ResourceType, + ResourceTypeValue, + canonicalize_allocation_unit, + validate_doe_iri_urn, + urn_has_complete_prefix, +) + + +class DoeIriUrnTests(unittest.TestCase): + def test_allocation_unit_legacy_token_normalizes_to_canonical_urn(self): + self.assertEqual( + canonicalize_allocation_unit("node_hours"), + AllocationUnit.node_hours, + ) + + def test_resource_model_normalizes_legacy_type_to_canonical_urn(self): + resource = status_models.Resource( + id="resource-1", + site_id="site-1", + capability_ids=[], + name="GPU Partition", + description="Compute partition", + last_modified="2026-05-12T12:00:00Z", + current_status=status_models.Status.up, + resource_type="compute", + ) + self.assertEqual(resource.resource_type, ResourceType.compute) + + def test_unregistered_facility_resource_urn_is_accepted(self): + resource = status_models.Resource( + id="resource-1", + site_id="site-1", + capability_ids=[], + name="XRootD Endpoint", + description="Facility-local XRootD resource", + last_modified="2026-05-12T12:00:00Z", + current_status=status_models.Status.up, + resource_type="urn:doe-iri:resource:xrootd", + ) + self.assertEqual(resource.resource_type, "urn:doe-iri:resource:xrootd") + + def test_resource_find_supports_parent_prefix_matching(self): + parent = status_models.Resource( + id="resource-1", + site_id="site-1", + capability_ids=[], + name="Scratch", + description="Scratch filesystem", + last_modified="2026-05-12T12:00:00Z", + current_status=status_models.Status.up, + resource_type="urn:doe-iri:resource:storage:filesystem:scratch", + ) + matches = status_models.Resource.find([parent], resource_type=ResourceType.storage) + self.assertEqual([item.id for item in matches], ["resource-1"]) + + def test_unregistered_resource_subtype_matches_registered_parent(self): + resource = status_models.Resource( + id="resource-1", + site_id="site-1", + capability_ids=[], + name="XRootD Storage", + description="Facility-local storage subtype", + last_modified="2026-05-12T12:00:00Z", + current_status=status_models.Status.up, + resource_type="urn:doe-iri:resource:storage:xrootd", + ) + matches = status_models.Resource.find([resource], resource_type=ResourceType.storage) + self.assertEqual([item.id for item in matches], ["resource-1"]) + + def test_prefix_matching_requires_complete_segments(self): + self.assertFalse( + urn_has_complete_prefix( + "urn:doe-iri:resource:stor", + "urn:doe-iri:resource:storage:filesystem:scratch", + ) + ) + + def test_domain_specific_string_allows_rfc8141_slash(self): + self.assertEqual( + validate_doe_iri_urn("urn:doe-iri:resource:facility-code/scanner"), + "urn:doe-iri:resource:facility-code/scanner", + ) + + def test_domain_specific_string_rejects_empty_hierarchy_segments(self): + invalid_values = [ + "urn:doe-iri:resource::xrootd", + "urn:doe-iri:resource:storage::xrootd", + "urn:doe-iri:resource:storage:", + "urn:doe-iri:resource::", + ] + for value in invalid_values: + with self.subTest(value=value): + with self.assertRaises(ValueError): + validate_doe_iri_urn(value) + + def test_typed_urn_schemas_include_openapi_hints(self): + cases = [ + (ResourceTypeValue, "resource", ResourceType.storage), + (AllocationUnitValue, "allocation", AllocationUnit.node_hours), + (CompressionTypeValue, "compression", CompressionType.gzip), + ] + for type_annotation, domain, example in cases: + with self.subTest(domain=domain): + schema = TypeAdapter(type_annotation).json_schema() + self.assertEqual(schema["type"], "string") + self.assertEqual(schema["minLength"], len(f"urn:doe-iri:{domain}:") + 1) + self.assertIn("pattern", schema) + self.assertRegex(example, schema["pattern"]) + self.assertIn("input compatibility aliases", schema["description"]) + + def test_filesystem_request_normalizes_legacy_compression_token(self): + request = filesystem_models.PostCompressRequest( + path="/tmp/src", + target_path="/tmp/out.tar.gz", + compression="gzip", + ) + self.assertEqual(request.compression, CompressionType.gzip) + + def test_filesystem_request_rejects_wrong_urn_domain(self): + with self.assertRaises(ValueError): + filesystem_models.PostExtractRequest( + path="/tmp/archive.tar", + target_path="/tmp/out", + compression=ResourceType.storage, + ) + + +if __name__ == "__main__": + unittest.main()