Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dev/fsspec_inspector/generate_flavours.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def _fix_oss_file_system(x: str) -> str:
def _fix_xrootd_file_system(x: str) -> str:
x = re.sub(
r"return client[.]URL\(path\)[.]path_with_params",
"x = urlsplit(path); return (x.path + f'?{x.query}' if x.query else '')",
"x = urlsplit(path); return (x.path + (f'?{x.query}' if x.query else '')).removeprefix('/')", # noqa: E501
x,
)
x = re.sub(r"client[.]URL\(u\)", "urlsplit(u)", x)
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ dev = [
"adlfs >=2024",
"huggingface_hub",
"webdav4[fsspec]",
"fsspec-xrootd",
# testing
"moto[s3,server]",
"wsgidav",
Expand Down
7 changes: 6 additions & 1 deletion upath/_chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,12 @@ def chain(self, segments: Sequence[ChainSegment]) -> tuple[str, dict[str, Any]]:
if segment.path.startswith(f"{segment.protocol}:/"):
urlpath = segment.path
else:
urlpath = f"{segment.protocol}://{segment.path}"
# If "hostid" is specified in the storage options then insert it
# relevant for XRootDPath
if (hostid := segment.storage_options.get("hostid")) is not None:
urlpath = f"{segment.protocol}://{hostid}/{segment.path}"
else:
urlpath = f"{segment.protocol}://{segment.path}"
elif segment.protocol:
urlpath = segment.protocol
elif segment.path is not None:
Expand Down
4 changes: 2 additions & 2 deletions upath/_flavour_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ def _strip_protocol(cls, path: str) -> str:

class XRootDFileSystemFlavour(AbstractFileSystemFlavour):
__orig_class__ = 'fsspec_xrootd.xrootd.XRootDFileSystem'
__orig_version__ = '0.5.1'
__orig_version__ = '0.5.5'
protocol = ('root',)
root_marker = '/'
sep = '/'
Expand All @@ -997,7 +997,7 @@ class XRootDFileSystemFlavour(AbstractFileSystemFlavour):
def _strip_protocol(cls, path: str | list[str]) -> Any:
if isinstance(path, str):
if path.startswith(cls.protocol):
x = urlsplit(path); return (x.path + f'?{x.query}' if x.query else '').rstrip("/") or cls.root_marker
x = urlsplit(path); return (x.path + (f'?{x.query}' if x.query else '')).removeprefix('/').rstrip("/") or cls.root_marker
# assume already stripped
return path.rstrip("/") or cls.root_marker
elif isinstance(path, list):
Expand Down
46 changes: 46 additions & 0 deletions upath/implementations/xrootd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from __future__ import annotations

import sys
from typing import TYPE_CHECKING

from upath.core import UPath
from upath.types import JoinablePathLike

if TYPE_CHECKING:
from typing import Literal

if sys.version_info >= (3, 11):
from typing import Unpack
else:
from typing_extensions import Unpack

from upath._chain import FSSpecChainParser
from upath.types.storage_options import XRootDStorageOptions

__all__ = ["XRootDPath"]


class XRootDPath(UPath):
__slots__ = ()

if TYPE_CHECKING:

def __init__(
self,
*args: JoinablePathLike,
protocol: Literal["root"] | None = ...,
chain_parser: FSSpecChainParser = ...,
**storage_options: Unpack[XRootDStorageOptions],
) -> None: ...

def mkdir(
self, mode: int = 0o777, parents: bool = False, exist_ok: bool = False
) -> None:
if not parents and not exist_ok and self.exists():
raise FileExistsError(self.path)
super().mkdir(mode=mode, parents=parents, exist_ok=exist_ok)

def write_bytes(self, data: bytes) -> int:
with self.fs.open(self.path, "wb") as f:
rv = f.write(data)
return rv
4 changes: 4 additions & 0 deletions upath/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
from upath.implementations.smb import SMBPath as _SMBPath
from upath.implementations.tar import TarPath as _TarPath
from upath.implementations.webdav import WebdavPath as _WebdavPath
from upath.implementations.xrootd import XRootDPath as _XRootDPath
from upath.implementations.zip import ZipPath as _ZipPath


Expand Down Expand Up @@ -102,6 +103,7 @@ class _Registry(MutableMapping[str, "type[upath.UPath]"]):
"http": "upath.implementations.http.HTTPPath",
"https": "upath.implementations.http.HTTPPath",
"memory": "upath.implementations.memory.MemoryPath",
"root": "upath.implementations.xrootd.XRootDPath",
"s3": "upath.implementations.cloud.S3Path",
"s3a": "upath.implementations.cloud.S3Path",
"simplecache": "upath.implementations.cached.SimpleCachePath",
Expand Down Expand Up @@ -269,6 +271,8 @@ def get_upath_class(protocol: Literal["file", "local"]) -> type[_FilePath]: ...
@overload
def get_upath_class(protocol: Literal["memory"]) -> type[_MemoryPath]: ...
@overload
def get_upath_class(protocol: Literal["root"]) -> type[_XRootDPath]: ...
@overload
def get_upath_class(protocol: Literal["sftp", "ssh"]) -> type[_SFTPPath]: ...
@overload
def get_upath_class(protocol: Literal["smb"]) -> type[_SMBPath]: ...
Expand Down
45 changes: 45 additions & 0 deletions upath/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -742,3 +742,48 @@ def ftp_server(ftp_server_process):
del_func(file_path)
except Exception:
pass


@pytest.fixture(scope="module")
def xrootd_server():
if shutil.which("docker") is None:
pytest.skip("docker not installed")

name = "fsspec_test_xrootd"
stop_docker(name)
cmd = (
"docker run"
" -d"
f" --name {name}"
f" -u {os.getuid()}"
" -p 1094:1094"
" --tmpfs /shared"
" ponyisi/xrootd_docker xrootd /shared"
)
try:
subprocess.run(shlex.split(cmd))
yield "root://localhost//shared"
finally:
stop_docker(name)


@pytest.fixture
def xrootd_fixture(local_testdir, xrootd_server):
xrootd_url = xrootd_server
fs = fsspec.filesystem("root", hostid="localhost")
fs.clear_instance_cache()
pth_testdir = Path(local_testdir)
dirname = pth_testdir.parent
# XRootD filesystem does not support put() so copy files the hard way
for pth_tup in os.walk(local_testdir):
tdir = xrootd_url + "/" + str(Path(pth_tup[0]).relative_to(dirname))
fs.mkdir("/shared/" + str(Path(pth_tup[0]).relative_to(dirname)))
for fpath in pth_tup[2]:
with (Path(pth_tup[0]) / fpath).open("rb") as infile:
with fsspec.open(tdir + "/" + fpath, mode="wb") as outfile:
outfile.write(infile.read())

try:
yield xrootd_url + "/" + Path(local_testdir).name
finally:
fs.rm("/shared/" + Path(local_testdir).name, recursive=True)
18 changes: 18 additions & 0 deletions upath/tests/implementations/test_xrootd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import pytest

from upath import UPath
from upath.implementations.xrootd import XRootDPath

from ..cases import BaseTests
from ..utils import OverrideMeta
from ..utils import overrides_base


class TestUPathXRootD(BaseTests, metaclass=OverrideMeta):
@pytest.fixture(autouse=True, scope="function")
def path(self, xrootd_fixture):
self.path = UPath(xrootd_fixture)

@overrides_base
def test_is_correct_class(self):
assert isinstance(self.path, XRootDPath)
1 change: 1 addition & 0 deletions upath/tests/test_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"https",
"local",
"memory",
"root",
"s3",
"s3a",
"simplecache",
Expand Down
20 changes: 20 additions & 0 deletions upath/types/storage_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,26 @@ class SMBStorageOptions(_AbstractStorageOptions, total=False):
auto_mkdir: bool # Whether to create parent directories when opening files


class XRootDStorageOptions(_AbstractStorageOptions, total=False):
"""Storage options for XRootD filesystem"""

# Connection settings
hostid: str # The remote server to connect to (default from )
timeout: int # Connection timeout; 0 means no timeout (default: 0)

# File handle cache settings
filehandle_cache_size: int # Maximum number of items (default: 256)
filehandle_cache_ttl: int # TTL in seconds (default: 30)

# Source lookup settings
locate_all_sources: (
bool # For reading, find all locations hosting file, ignoring redirector
)
valid_sources: (
list[str] | None
) # If given and locate_all_sources is True, restrict sources to this server list


class WebdavStorageOptions(_AbstractStorageOptions, total=False):
"""Storage options for WebDAV filesystem"""

Expand Down