Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crossflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
together command-line driven tools.
"""

__version__ = "0.1.4"
__version__ = "0.1.5.dev0"
68 changes: 64 additions & 4 deletions crossflow/filehandling.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
filename_here = fh.save(filename_here)

they inherit from os.PathLike so can be used anywhere a conventional path can
be used:
be used e.g.

with open(fh) as f:
...
Expand Down Expand Up @@ -44,6 +44,7 @@ def set_stage_point(stage_point):
class FileHandler:
"""
Handle file operations

"""

def __init__(self, stage_point=None):
Expand All @@ -55,13 +56,27 @@ def __init__(self, stage_point=None):
def load(self, path):
"""
Method to load file.

args:
path (str): file path or URL

returns:
FileHandle: a FileHandle object

"""

return FileHandle(path, self.stage_point, must_exist=True)

def create(self, path):
"""
Method to load file.
Method to create a new file.

args:
path (str): file path

returns:
FileHandle: a FileHandle object

"""

return FileHandle(path, self.stage_point, must_exist=False)
Expand All @@ -70,14 +85,51 @@ def create(self, path):
class FileHandle:
"""
A portable container for a file.

a FileHandle object is instantiated with the path of an existing file on
an existing file system:

fh = FileHandle('/path/to/file')
or:
fh = FileHandle('http://example.com/file.txt')

and has a save() method that creates a local copy of that file:

filename_here = fh.save(filename_here)

FileHandle objects inherit from os.PathLike so can be used anywhere a
conventional path can be used:

with open(fh) as f:
...

They can also be used to read and write binary and text data directly:

data = fh.read_binary()
text = fh.read_text()
fh.write_binary(data)
fh.write_text(text)

"""

def __init__(self, path, stage_point, must_exist=True):
if not isinstance(path, (os.PathLike, str, bytes)):
raise IOError(f"Error - illegal argument type {type(path)} for {path}")
if must_exist:
if not os.path.exists(path):
raise IOError("Error - no such file")
if isinstance(path, str) and (
path.startswith("http://")
or path.startswith("https://")
or path.startswith("ftp://")
):
try:
source = fsspec.open(path)
with source as s:
s.read(1)
except Exception as e:
raise IOError("Error - no such file") from e
else:
if not os.path.exists(path):
raise IOError("Error - no such file")
source = fsspec.open(path)
ext = os.path.splitext(path)[1]
self.path = path
Expand Down Expand Up @@ -180,6 +232,10 @@ def read_text(self):
def write_binary(self, data):
"""
A method for writing binary file formats

args:
data (bytes): binary data to write

"""

compressed_data = zlib.compress(data)
Expand All @@ -194,6 +250,10 @@ def write_binary(self, data):
def write_text(self, text):
"""
A wrapper for writing binary formatted text.

args:
text (str): text data to write

"""

self.write_binary(text.encode("utf-8"))
41 changes: 38 additions & 3 deletions crossflow/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,19 @@ def _gen_filenames(pattern, n_files):
class SubprocessTask:
"""
A task that runs a command-line executable

Methods:
set_inputs: set the inputs the task requires
set_outputs: set the outputs the task produces
set_constant: set a constant for the task
run: execute the task

"""

def __init__(self, template):
"""
Arguments:
Initialize the SubprocessTask.
args:
template (str): a template for the command to be executed
"""
self.template = template
Expand All @@ -66,6 +74,10 @@ def __call__(self, *args):
def set_inputs(self, inputs):
"""
Set the inputs the task requires

args:
inputs (list): a list of input variable names

"""
if not isinstance(inputs, list):
raise TypeError(
Expand All @@ -76,6 +88,10 @@ def set_inputs(self, inputs):
def set_outputs(self, outputs):
"""
Set the outputs the task produces

args:
outputs (list): a list of output variable names

"""
if not isinstance(outputs, list):
raise TypeError(
Expand All @@ -88,6 +104,11 @@ def set_constant(self, key, value):
Set a constant for the task
If it was previously defined as an input variable, remove it from
that list.

args:
key (str): the name of the constant
value (str): the value of the constant

"""
d = {"name": key}
try:
Expand Down Expand Up @@ -194,7 +215,8 @@ class FunctionTask:

def __init__(self, func):
"""
Arguments:
Initialize the FunctionTask.
args:
func: the Python function to wrap
"""
self.func = func
Expand All @@ -210,18 +232,31 @@ def __call__(self, *args):
def set_inputs(self, inputs):
"""
Set the inputs the task requires

args:
inputs (list): a list of input variable names

"""
self.inputs = inputs

def set_outputs(self, outputs):
"""
Set the outputs the task produces

args:
outputs (list): a list of output variable names

"""
self.outputs = outputs

def set_constant(self, key, value):
"""
Set a parameters for the task
Set a constant for the task

args:
key (str): the name of the constant
value (str): the value of the constant

"""
try:
self.constants[key] = self.filehandler.load(value)
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ dependencies = [
"dask",
"distributed",
"fsspec",
"requests",
"aiohttp"
]

[project.urls]
Expand Down
7 changes: 7 additions & 0 deletions tests/test_filehandling.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ def test_file_protocol(tmpdir):
assert pf.read_text() == "content"


def test_url_protocol():
fh = filehandling.FileHandler()
f = fh.load(
"https://raw.githubusercontent.com/HECBioSim/crossflow/refs/heads/main/README.md"
)


"""
def test_s3_protocol(tmpdir):
d = tmpdir.mkdir('sub')
Expand Down
Loading