.zarr.zip on s3 #1613
Replies: 5 comments 4 replies
-
This is great @jeffpeck10x! Would you be willing to add this to the tutorial? I wonder whether the built-in ZipStore has any use for reading from S3. Or is fsspec's ZipFileSystem always required here? |
Beta Was this translation helpful? Give feedback.
-
An additional option would be to use
With thanks to @caviere (see https://github.com/zarr-developers/outreachy_2022_testing_zipstore/blob/main/real%20%20world%20data/main.py#L54) Regardless, 👍 for your version and/or this one being in the tutorial location that would have helped you find them! Thanks. |
Beta Was this translation helpful? Give feedback.
-
made a small pr to update docs: #1615 |
Beta Was this translation helpful? Give feedback.
-
With Zarr v3, I used the following code to create a read-only Zarr store from a zipped Zarr in S3: import zarr
import s3fs
class S3ZipStore(zarr.storage.ZipStore):
def __init__(self, path: s3fs.S3File) -> None:
super().__init__(path="", mode="r")
self.path = path
s3 = s3fs.S3FileSystem(anon=True, endpoint_url=S3_ENDPOINT, asynchronous=False)
file = s3.open(f"s3://{S3_BUCKET}/{ZIP_PATH}")
zarr_store = S3ZipStore(file) It relies on that ZipStore only checks that Edit: A load time benchmark shows that ZipStore created from a local zip file suffers a similar performance hit compared to LocalStore: ---
config:
xyChart:
width: 900
height: 300
themeVariables:
xyChart:
backgroundColor: "#000"
titleColor: "#fff"
xAxisLabelColor: "#fff"
xAxisTitleColor: "#fff"
xAxisTickColor: "#fff"
xAxisLineColor: "#fff"
yAxisLabelColor: "#fff"
yAxisTitleColor: "#fff"
yAxisTickColor: "#fff"
yAxisLineColor: "#fff"
plotColorPalette: "#fff8, #000"
---
xychart-beta
title "Random Sentinel 2 patch time series load time benchmark (5100 m x 5100 m, 1 year)"
x-axis ["S3 Zarr", "S3 zipped Zarr", "NVMe Zarr", "NVMe zipped Zarr"]
y-axis "Mean load time (s)" 0 --> 26
bar [7.53, 23.9, 1.12, 3.21]
bar [0, 0, 0, 0, 0, 0, 0, 0, 0]
|
Beta Was this translation helpful? Give feedback.
-
Here's a rudimentary async fsspec read-only file system for uncompressed ("store" compression) zip files that minimizes the number of reads (Initially 2 reads, then 1 read per file). With this async file system, reading from a zipped Zarr v3 in S3 or local storage is no longer slower than reading from a Zarr. The code is also in our benchmark repo. ---
config:
xyChart:
width: 900
height: 300
themeVariables:
xyChart:
backgroundColor: "#000"
titleColor: "#fff"
xAxisLabelColor: "#fff"
xAxisTitleColor: "#fff"
xAxisTickColor: "#fff"
xAxisLineColor: "#fff"
yAxisLabelColor: "#fff"
yAxisTitleColor: "#fff"
yAxisTickColor: "#fff"
yAxisLineColor: "#fff"
plotColorPalette: "#fff8, #000"
---
xychart-beta
title "Random Sentinel 2 patch time series load time benchmark (5100 m x 5100 m, 1 year)"
x-axis ["S3 Zarr", "S3 zipped Zarr (async)", "NVMe Zarr", "NVMe zipped Zarr (async)"]
y-axis "Mean load time (s)" 0 --> 8
bar [7.39, 7.17, 1.94, 1.67]
bar [0, 0, 0, 0]
Usage: # Async S3 zipped Zarr
import s3fs
s3 = s3fs.S3FileSystem(anon=True, endpoint_url=S3_ENDPOINT, asynchronous=True)
zipfs = ReadOnlyZipFileSystem(s3, f"{S3_BUCKET}/{ZIP_PATH}")
zarr_store = zarr.storage.FsspecStore(fs=zipfs, read_only=True, path="")
# Async local zipped Zarr
from fsspec.implementations.local import LocalFileSystem
from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper
local_fs = LocalFileSystem()
async_local_fs = AsyncFileSystemWrapper(local_fs)
zipfs = ReadOnlyZipFileSystem(async_local_fs, ZIP_PATH)
zarr_store = zarr.storage.FsspecStore(fs=zipfs, read_only=True, path="") Code: from fsspec.asyn import AsyncFileSystem
import asyncio
import posixpath
from typing import List, Optional
import struct
class ReadOnlyZipFileSystem(AsyncFileSystem):
"""An async read-only file system for uncompressed zip files using fsspec.
Mounts an uncompressed zip file as a read-only file system. Supports ZIP64.
Only supports ZIP_STORED (uncompressed). Multi-disk zip files are not
supported. Written for reading a zipped Zarr from S3 storage and therefore
only includes methods used by Zarr.
Reads 64KB from the end of the zip file to capture the ZIP64 end of central
directory (EOCD), the ZIP64 EOCD locator, and the standard EOCD, using
_cat_file with negative start offset and therefore not needing to query the
file size first. Reads the CD using _cat_file with positive start offset.
Parses the CD for the file names and the header offsets. Requires that the
CD contains an entry of each directory (except for the autogenerated root
dir) before the entries of their files and subdirectories. Requires that
the CD entries appear in the same order as the file headers and the files.
Assumes that there are no gaps between file headers and files. This way the
file headers need not be read, because the file offset will be the offset
of the next file header (or CD for the last file) minus the file size. File
datetimes are not available.
While not currently supported, support for compressed zip files could be
implemented by reading the file headers (ending the read at the next header
or CD) and by decompression.
"""
protocol = "zipfs"
MAX_ZIP_TAIL_READ = 64 * 1024
def __init__(self, fs: AsyncFileSystem, path: str, **kwargs):
"""Initialize the ReadOnlyZipFileSystem.
Args:
fs: The underlying AsyncFileSystem containing the zip file.
path: Path to the zip file in the underlying file system.
**kwargs: Additional arguments passed to AsyncFileSystem.
"""
super().__init__(**kwargs)
self.asynchronous = True
self.fs = fs
self.path = path
self._files = None
self._lock = asyncio.Lock()
async def _initialize(self):
"""Initialize self._files by reading and parsing the central directory.
All other methods that require self._files first await this method.
They never modify self._files. Locking is used to ensure that the
initialization is thread-safe. The other methods need not lock
explicitly.
"""
async with self._lock:
if self._files is not None:
return
# Read tail of file (up to MAX_ZIP_TAIL_READ) from the end
data = await self.fs._cat_file(self.path, start=-self.MAX_ZIP_TAIL_READ, end=None)
if len(data) < 22: # Minimum size for standard EOCD
raise ValueError(f"EOCD doesn't fit in {self.path}: {len(data)} bytes")
# EOCD variables and their lengths (order matters)
# These dicts use exact wording from https://pkwaredownloads.blob.core.windows.net/pkware-general/Documentation/APPNOTE-6.3.9.TXT
eocd_var_lengths = {
'end of central dir signature': 4, # Signature (0x06054b50)
'number of this disk': 2, # Disk number
'number of the disk with the start of the central directory': 2, # Disk number of the central directory
'total number of entries in the central directory on this disk': 2, # Number of entries on this disk
'total number of entries in the central directory': 2, # Total number of entries
'size of the central directory offset of start of central directory with respect to': 4, # Size of the central directory
'the starting disk number': 4, # Offset of the start of the central directory
'.ZIP file comment length': 2 # Length of the comment
}
# ZIP64 EOCD variables and their lengths (order matters)
zip64_eocd_var_lengths = {
'zip64 end of central dir signature': 4,
'size of zip64 end of central directory record': 8,
'version made by': 2,
'version needed to extract': 2,
'number of this disk': 4,
'number of the disk with the start of the central directory': 4,
'total number of entries in the central directory on this disk': 8,
'total number of entries in the central directory': 8,
'size of the central directory': 8,
'offset of start of central directory with respect to the starting disk number': 8
}
# Map lengths to struct formats
var_length_to_format = {
2: 'H', # Unsigned short (2 bytes)
4: 'L', # Unsigned long (4 bytes)
8: 'Q' # Unsigned long long (8 bytes)
}
# Parse EOCD
eocd = {}
is_zip64 = False
eocd_pos = data[:-20+4].rfind(b'\x50\x4b\x05\x06') # 20 bytes for EOCD, 4 bytes for EOCD signature
if eocd_pos == -1:
raise ValueError(f"No EOCD in the last {self.MAX_ZIP_TAIL_READ} bytes of {self.path}")
pos = eocd_pos
for var, length in eocd_var_lengths.items():
eocd[var] = struct.unpack_from(f'<{var_length_to_format[length]}', data, pos)[0]
# Check for ZIP64 values 0xFF or 0xFFFFFFFF
if eocd[var] == 2**(length*8) - 1:
eocd[var] = None
is_zip64 = True
pos += length
if is_zip64:
if len(data) - 22 < 56 + 20: # 56 bytes for ZIP64 EOCD + 20 bytes for locator
raise ValueError(f"ZIP64 EOCD and ZIP64 EOCD locator do not fit in {self.path}")
# Find ZIP64 EOCD
zip64_eocd_pos = data[:eocd_pos-56+4].rfind(b'\x50\x4b\x06\x06') # 20 bytes for EOCD, 56 bytes for ZIP64 EOCD, 4 bytes for ZIP64 EOCD signature
if zip64_eocd_pos == -1:
raise ValueError(f"No ZIP64 EOCD in the last {self.MAX_ZIP_TAIL_READ} bytes of {self.path}")
pos = zip64_eocd_pos
for var, length in zip64_eocd_var_lengths.items():
eocd[var] = struct.unpack_from(f'<{var_length_to_format[length]}', data, pos)[0]
pos += length
# Require single-disk zip
if eocd['number of this disk'] != 0 or eocd['number of the disk with the start of the central directory'] != 0 or eocd['total number of entries in the central directory on this disk'] != eocd['total number of entries in the central directory']:
raise ValueError(f"Unsupported multi-disk central directory in {self.path}")
# Convenience variables
cd_size = eocd['size of the central directory']
cd_offset = eocd['offset of start of central directory with respect to the starting disk number']
cd_entries = eocd['total number of entries in the central directory']
# Read and parse central directory
if cd_size == 0:
# No central directory, empty zip file
return
cd_data = await self.fs._cat_file(self.path, start=cd_offset, end=cd_offset + cd_size)
# Save data to file
if len(cd_data) != cd_size:
raise ValueError(f"Failed to read central directory: expected {cd_size} bytes, got {len(cd_data)}")
# Central directory file header variables and their lengths
cd_file_header_var_lengths = {
'central file header signature': 4,
'version made by': 2,
'version needed to extract': 2,
'general purpose bit flag': 2,
'compression method': 2,
'last mod file time': 2,
'last mod file date': 2,
'crc-32': 4,
'compressed size': 4,
'uncompressed size': 4,
'file name length': 2,
'extra field length': 2,
'file comment length': 2,
'disk number start': 2,
'internal file attributes': 2,
'external file attributes': 4,
'relative offset of local header': 4
}
# Central or local file header ZIP64 extended information extra field variables and their lengths
# These share names with the standard fields but have different lengths
zip64_file_header_var_lengths = {
'uncompressed size': 8,
'compressed size': 8,
'relative offset of local header': 8,
'disk number start': 4
}
# Autocreate root dir
self._files = {
'': {
'children': []
}
}
# Parse central directory entries
pos = 0
previous_file = None
for file_index in range(cd_entries):
if pos + 46 > len(cd_data): # 46 bytes for the central directory file header
raise ValueError(f"Truncated central directory entry in {self.path}")
cd_file_header = {}
for var, length in cd_file_header_var_lengths.items():
cd_file_header[var] = struct.unpack_from(f'<{var_length_to_format[length]}', cd_data, pos)[0]
# Check for ZIP64 values 0xFF or 0xFFFFFFFF
if cd_file_header[var] == 2**(length*8) - 1:
cd_file_header[var] = None
pos += length
if cd_file_header['central file header signature'] != 0x02014b50:
raise ValueError(f"Invalid central directory header signature in {self.path}")
if cd_file_header['compression method'] != 0 or cd_file_header['compressed size'] != cd_file_header['uncompressed size']:
raise ValueError(f"File in {self.path} is not stored (uncompressed)")
utf8 = cd_file_header['general purpose bit flag'] & 0x800 != 0 # Bit 11
# Convenience variables
fname_len = cd_file_header['file name length']
extra_len = cd_file_header['extra field length']
comment_len = cd_file_header['file comment length']
# Read filename
if pos + fname_len > len(cd_data):
raise ValueError(f"Truncated filename in {self.path}")
if utf8:
fname = cd_data[pos:pos+fname_len].decode('utf-8')
else:
fname = cd_data[pos:pos+fname_len].decode('ascii')
pos += fname_len
# Parse extra field
extra_end = pos + extra_len
if extra_end > len(cd_data):
raise ValueError(f"Truncated extra field in {self.path}")
while pos < extra_end:
if pos + 4 > extra_end:
raise ValueError(f"Truncated extra field in {self.path}")
tag, size = struct.unpack_from('<HH', cd_data, pos)
pos += 4
if pos + size > extra_end:
raise ValueError(f"Truncated extra field in {self.path}")
backup_pos = pos
if tag == 0x0001:
# ZIP64 extended information extra field
for var, length in zip64_file_header_var_lengths.items():
# Only parse variables that were marked as ZIP64
if cd_file_header[var] is None:
cd_file_header[var] = struct.unpack_from(f'<{var_length_to_format[length]}', cd_data, pos)[0]
pos += length
else:
if tag == 0x5455:
# Extended Timestamp, not handled
None
elif tag == 0x7875:
# Info-ZIP New Unix Extra Field, skip it
None
else:
# Unknown extra field, skip it
None
pos += size
if backup_pos + size != pos:
raise ValueError(f"Invalid extra field size in {self.path}")
pos = backup_pos + size
# Convenience variables
size = cd_file_header['compressed size']
offset = cd_file_header['relative offset of local header']
# Skip comment
pos += comment_len
# Detect directory
is_dir = fname.endswith('/')
if is_dir:
# Remove trailing slash
fname = fname[:-1]
# Store file info
if is_dir:
# Dirs can be recognized by key 'children' existing
file = {
'children': []
}
else:
# It's a file
file = {
'size': size,
'offset': offset
}
self._files[fname] = file
# Add file as the parent directory's child
if fname != '':
parent = posixpath.dirname(fname)
if parent not in self._files:
raise NotImplementedError("Autocreation of parent folder {parent} not implemented, in {self.path}")
self._files[parent]['children'].append(fname)
# Calculate offset of previous file
if previous_file is not None and 'children' not in previous_file:
# We require the files to be stored in the order of the central directory entries,
# although sorting would be an option
if offset <= previous_file['offset']:
raise ValueError(f"Non-ascending order of local header offsets in {self.path}")
# Previous file's offset = current file's offset - previous file's size
# This should work unless there is empty space in the zip file, which is unlikely.
previous_file['offset'] = offset - previous_file['size']
previous_file = file
# Calculate offset of previous file
if previous_file is not None and 'children' not in previous_file:
if cd_offset <= previous_file['offset']:
raise ValueError(f"Non-ascending order of local header offsets in {self.path}")
# Last file ends where central directory begins
previous_file['offset'] = cd_offset - previous_file['size']
async def _ls(self, path: str, detail: bool = True, **kwargs) -> List:
""" List files and directories in the given path.
If the path points to a file, list just the file.
"""
# Always await self._initialize() in functions needing self._files
await self._initialize()
# Internally we don't use a root slash, so strip it. Also strip any trailing slash.
path = posixpath.normpath(path).lstrip('/').rstrip('/')
# Helper function to get file name or details
def get_file_listing(file, fname, detail):
if detail:
if 'children' in file:
return {
'name': f'/{fname}',
'type': 'directory',
'size': 0,
'created': None,
'islink': False
}
else:
return {
'name': f'/{fname}',
'type': 'file',
'size': file['size'],
'created': None,
'islink': False
}
else:
return f'/{fname}'
# List children
results = []
if path not in self._files:
raise FileNotFoundError(f"Path {path} not found")
file = self._files[path]
if 'children' in file:
# Path points to a dir
for child in file['children']:
results.append(get_file_listing(self._files[child], child, detail))
else:
# Path points to a file
results = [get_file_listing(file, path, detail)]
return results
async def _cat_file(self, path: str, start: Optional[int] = None, end: Optional[int] = None, **kwargs) -> bytes:
"""Read the contents of a file in the zip."""
# Always await self._initialize() in functions needing self._files
await self._initialize()
# Internally we don't use a root slash, so strip it. Also strip any trailing slash.
path = posixpath.normpath(path).lstrip('/').rstrip('/')
# Check if the file is available
if path not in self._files:
raise FileNotFoundError(f"File {path} not found")
elif 'children' in self._files[path]:
raise FileNotFoundError(f"{path} is a directory")
# Get offset and size of the file in the zip file
info = self._files[path]
offset = info['offset']
size = info['size']
# Set start to beginning of file if not specified
start = start or 0
# Convert negative start (relative to end of file) to positive start
if start < 0:
start = max(0, size + start) # Clamp too large negative start to the beginning of file
# Set end to end of file if not specified
end = end or size
# Convert negative start (relative to end of file) to positive start
if end < 0:
end = max(0, size + end) # Clamp too large negative start to the beginning of file
# For start beyond the end of the file or the end, return empty data
if start >= size or end <= start:
return b''
# Calculate zip file read start and read size
read_start = offset + start
read_size = min(end, size) - start # Clamp too large end at size
# Read data
data = await self.fs._cat_file(self.path, start=read_start, end=read_start+read_size)
return data |
Beta Was this translation helpful? Give feedback.
-
I searched and could not find an example of accessing a
.zarr.zip
from an s3 endpoint without having to first download it entirely. The providedzarr.storage.ZipStore
only works on a local path (right?).I experimented and found that this works:
I am wondering if this is alright. Is there anything that could be improved with this approach?
My use-case is read-only. I understand that this approach would not be able to handle updates without updating the entire
.zarr.zip
.And more generally, if someone else is searching for a solution like this, I hope this helps!
Beta Was this translation helpful? Give feedback.
All reactions