diff --git a/pyproject.toml b/pyproject.toml index 04d0cd96..71bd684e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,7 +26,7 @@ dependencies = [ "pydantic>=2.12.0", "pydantic-settings>=2.6.1", "rich>=14.1.0", - "segy>=0.5.4", + "segy>=0.6.0", "tqdm>=4.67.1", "universal-pathlib>=0.3.3", "xarray>=2025.10.1", diff --git a/src/mdio/ingestion/segy/pipeline.py b/src/mdio/ingestion/segy/pipeline.py index 16b66c6c..b2c54fab 100644 --- a/src/mdio/ingestion/segy/pipeline.py +++ b/src/mdio/ingestion/segy/pipeline.py @@ -10,7 +10,6 @@ from mdio.api.io import _normalize_path from mdio.converters.exceptions import GridTraceCountError -from mdio.converters.type_converter import to_structured_type from mdio.core.grid import Grid from mdio.ingestion.dataset_factory import build_mdio_dataset from mdio.ingestion.grid_qc import grid_density_qc @@ -25,6 +24,7 @@ from mdio.ingestion.segy.validation import validate_spec_in_template from mdio.segy.file import get_segy_file_info from mdio.segy.geometry import validate_overrides_for_template +from mdio.segy.utilities import build_mdio_header_type if TYPE_CHECKING: from pathlib import Path @@ -173,7 +173,7 @@ def segy_to_mdio( # noqa: PLR0913 grid = _build_grid(dimensions, indexed_headers, segy_file_info.num_traces) - header_dtype = to_structured_type(segy_spec.trace.header.dtype) + header_dtype = build_mdio_header_type(segy_spec) extra_variables = build_raw_header_variables(schema) mdio_ds = build_mdio_dataset( schema=schema, diff --git a/src/mdio/segy/utilities.py b/src/mdio/segy/utilities.py index 02be321d..dbd7d9e5 100644 --- a/src/mdio/segy/utilities.py +++ b/src/mdio/segy/utilities.py @@ -8,6 +8,12 @@ import numpy as np from dask.array.core import normalize_chunks +from segy.schema import ScalarType as SegyScalarType + +from mdio.builder.schemas.dtype import ScalarType as MdioScalarType +from mdio.builder.schemas.dtype import StructuredType +from mdio.converters.type_converter import to_numpy_dtype +from mdio.converters.type_converter import to_structured_type if TYPE_CHECKING: from dask.array import Array as DaskArray @@ -19,6 +25,46 @@ logger = logging.getLogger(__name__) +def ibm32_header_field_names(segy_spec: SegySpec) -> set[str]: + """Return the names of trace-header fields declared as IBM 32-bit floats. + + The segy schema maps an ``ibm32`` header field to a raw ``uint32`` slot (the 4-byte + IBM word) but decodes it to ``float32`` on read. Callers use these names to promote + the affected fields to ``float32`` so the decoded value is stored and projected + without truncating decimals or wrapping the sign of negative values. + + Args: + segy_spec: SEG-Y specification whose trace header fields are inspected. + + Returns: + Set of header field names whose declared format is ``ibm32``. + """ + return {field.name for field in segy_spec.trace.header.fields if field.format == SegyScalarType.IBM32} + + +def build_mdio_header_type(segy_spec: SegySpec) -> StructuredType: + """Build the MDIO ``headers`` variable type from a SegySpec. + + ``ibm32`` header fields are promoted from their raw ``uint32`` slot to ``float32`` so + the persisted header matches the decoded array the ingestion worker writes. Without + this promotion the decoded float would be cast down to an integer on write, truncating + decimals (``118.625`` -> ``118``) and wrapping signed values (``-50.25`` -> a large + unsigned integer). + + Args: + segy_spec: SEG-Y specification describing the trace header layout. + + Returns: + The MDIO structured type for the persisted ``headers`` variable. + """ + structured = to_structured_type(segy_spec.trace.header.dtype) + ibm32_names = ibm32_header_field_names(segy_spec) + for field in structured.fields: + if field.name in ibm32_names: + field.format = MdioScalarType.FLOAT32 + return structured + + def project_headers_to_segy_spec(headers: DaskArray, segy_spec: SegySpec) -> DaskArray: """Project stored MDIO trace headers onto the SegySpec trace header layout. @@ -49,7 +95,10 @@ def project_headers_to_segy_spec(headers: DaskArray, segy_spec: SegySpec) -> Das ) raise ValueError(msg) - target_dtype = np.dtype([(name, spec_header_dtype.fields[name][0].newbyteorder("=")) for name in target_names]) + # The export target must equal the dtype the headers were stored with at ingest, so route + # through the same builder. That keeps ibm32 fields as float32, which flows into + # SegyFactory.create_traces for IBM encoding instead of re-truncating to the raw uint32 slot. + target_dtype = to_numpy_dtype(build_mdio_header_type(segy_spec)) # Don't actually project if the dtype is already the same as the target dtype. if headers.dtype == target_dtype: diff --git a/tests/integration/test_segy_ibm32_header_roundtrip.py b/tests/integration/test_segy_ibm32_header_roundtrip.py new file mode 100644 index 00000000..7a2c7827 --- /dev/null +++ b/tests/integration/test_segy_ibm32_header_roundtrip.py @@ -0,0 +1,154 @@ +"""Round-trip tests for SEG-Y files that declare IBM float (``ibm32``) trace-header fields. + +These guard the IBM-real header data path end-to-end through real file I/O: + +* Ingestion (Defect A): mdio used to persist an ``ibm32`` header in its raw ``uint32`` slot, + casting the decoded float down to an integer. That truncated decimals (``118.625`` -> + ``118``) and wrapped the sign of negatives (``-50.25`` -> a huge unsigned int). The fix + promotes ``ibm32`` header fields to ``float32`` so the decoded value is stored faithfully. +* Export (Defect B): SegyFactory IBM-encodes ``ibm32`` *header* fields from ``segy`` 0.6.0 + onward (the minimum mdio requires), so the full round-trip exercises that encode/decode. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import fsspec +import numpy as np +from segy.factory import SegyFactory +from segy.schema import HeaderField +from segy.standards import get_segy_standard + +from mdio import mdio_to_segy +from mdio.api.io import open_mdio +from mdio.builder.template_registry import TemplateRegistry +from mdio.converters.segy import segy_to_mdio +from mdio.segy.file import SegyFileWrapper + +if TYPE_CHECKING: + from pathlib import Path + + import pytest + from segy.schema import SegySpec + +# Dyadic rationals chosen so they are exactly representable in IBM hex float, letting us assert +# exact equality. They cover the original failure modes: decimal truncation and sign wrapping. +IBM_HEADER_VALUES = np.array([118.625, -50.25, 0.5, -1.5], dtype="float32") + +GRID_INLINES = 3 +GRID_CROSSLINES = 4 +NUM_SAMPLES = 8 +IBM_FIELD_NAME = "ibm_attr" + + +def _ibm32_segy_spec() -> SegySpec: + """Build a rev1 SegySpec with an ``ibm32`` trace-header field alongside index/coord fields.""" + fields = [ + HeaderField(name="inline", byte=189, format="int32"), + HeaderField(name="crossline", byte=193, format="int32"), + HeaderField(name="coordinate_scalar", byte=71, format="int16"), + HeaderField(name="cdp_x", byte=181, format="int32"), + HeaderField(name="cdp_y", byte=185, format="int32"), + HeaderField(name="samples_per_trace", byte=115, format="int16"), + HeaderField(name="sample_interval", byte=117, format="int16"), + # Unassigned rev1 bytes 233-240: safe spot for a custom IBM-float attribute. + HeaderField(name=IBM_FIELD_NAME, byte=233, format="ibm32"), + ] + spec = get_segy_standard(1).customize(trace_header_fields=fields) + spec.segy_standard = 1 + return spec + + +def _ibm_values_per_trace(num_traces: int) -> np.ndarray: + """Tile the sample IBM values to cover every trace.""" + reps = int(np.ceil(num_traces / IBM_HEADER_VALUES.size)) + return np.tile(IBM_HEADER_VALUES, reps)[:num_traces].astype("float32") + + +def _write_ibm32_segy(path: Path, spec: SegySpec) -> np.ndarray: + """Write a small 3D SEG-Y whose ``ibm32`` header holds real IBM-float values. + + ``segy`` >= 0.6.0 exposes ``ibm32`` header fields as ``float32`` in the factory template and + IBM-encodes them on write, so real float values are assigned directly. + + Args: + path: Destination path for the generated SEG-Y file. + spec: SegySpec describing the trace header layout (must declare the ibm32 field). + + Returns: + The IBM header value assigned to each trace, in trace order. + """ + num_traces = GRID_INLINES * GRID_CROSSLINES + factory = SegyFactory(spec=spec, samples_per_trace=NUM_SAMPLES) + samples = factory.create_trace_sample_template(num_traces) + headers = factory.create_trace_header_template(num_traces) + + inlines, crosslines = np.mgrid[0:GRID_INLINES, 0:GRID_CROSSLINES] + headers["inline"] = (10 + inlines).ravel() + headers["crossline"] = (100 + crosslines * 2).ravel() + headers["coordinate_scalar"] = -100 + headers["cdp_x"] = (700_000 + inlines * 100).ravel() + headers["cdp_y"] = (4_000_000 + crosslines * 100).ravel() + headers["samples_per_trace"] = NUM_SAMPLES + headers["sample_interval"] = 4000 + + ibm_values = _ibm_values_per_trace(num_traces) + headers[IBM_FIELD_NAME] = ibm_values + + samples[:] = (np.arange(num_traces) + 1)[:, None] + + with fsspec.open(path.as_posix(), mode="wb") as fp: + fp.write(factory.create_textual_header()) + fp.write(factory.create_binary_header()) + fp.write(factory.create_traces(headers, samples)) + + return ibm_values + + +def test_ingested_ibm32_header_preserves_value(tmp_path: Path) -> None: + """SEG-Y -> MDIO must store ibm32 headers as float32 without truncating or wrapping (Defect A).""" + spec = _ibm32_segy_spec() + segy_path = tmp_path / "ibm32.sgy" + mdio_path = tmp_path / "ibm32.mdio" + + expected = _write_ibm32_segy(segy_path, spec) + + segy_to_mdio( + segy_spec=spec, + mdio_template=TemplateRegistry().get("PostStack3DTime"), + input_path=segy_path, + output_path=mdio_path, + overwrite=True, + ) + + headers = open_mdio(mdio_path)["headers"].values + stored = headers[IBM_FIELD_NAME].ravel() + + assert stored.dtype == np.float32 + np.testing.assert_array_equal(stored, expected) + + +def test_ibm32_header_full_roundtrip(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + """SEG-Y -> MDIO -> SEG-Y must preserve ibm32 header values end-to-end.""" + # The SEG-Y file headers must be persisted at ingest so they can be re-emitted on export. + monkeypatch.setenv("MDIO__IMPORT__SAVE_SEGY_FILE_HEADER", "true") + + spec = _ibm32_segy_spec() + segy_path = tmp_path / "ibm32.sgy" + mdio_path = tmp_path / "ibm32.mdio" + segy_rt_path = tmp_path / "ibm32_rt.sgy" + + expected = _write_ibm32_segy(segy_path, spec) + + segy_to_mdio( + segy_spec=spec, + mdio_template=TemplateRegistry().get("PostStack3DTime"), + input_path=segy_path, + output_path=mdio_path, + overwrite=True, + ) + mdio_to_segy(segy_spec=spec, input_path=mdio_path, output_path=segy_rt_path) + + roundtripped = SegyFileWrapper(segy_rt_path.as_posix(), spec=spec).trace[:].header[IBM_FIELD_NAME] + np.testing.assert_array_equal(roundtripped, expected) diff --git a/tests/unit/test_segy_header_projection.py b/tests/unit/test_segy_header_projection.py index f5f3d9c8..d60ce545 100644 --- a/tests/unit/test_segy_header_projection.py +++ b/tests/unit/test_segy_header_projection.py @@ -12,6 +12,9 @@ from segy.schema import TraceSpec from segy.standards import get_segy_standard +from mdio.builder.schemas.dtype import ScalarType as MdioScalarType +from mdio.segy.utilities import build_mdio_header_type +from mdio.segy.utilities import ibm32_header_field_names from mdio.segy.utilities import project_headers_to_segy_spec @@ -162,6 +165,31 @@ def test_projection_independent_of_mdio_field_order(self) -> None: for name in spec.trace.header.names: np.testing.assert_array_equal(a[name], b[name]) + def test_ibm32_field_projected_as_float32_preserves_decimals_and_sign(self) -> None: + """An ibm32 header stored as float32 must project to float32, not the raw uint32 slot. + + Casting to the raw uint32 slot would truncate decimals and wrap negatives, which is the + ingestion/export corruption (Defect A) this guards against. + """ + num = 4 + mdio_dtype = np.dtype([("inline", " None: """If source headers dtype exactly matches target dtype, return array unchanged.""" spec = _make_segy_spec( @@ -188,3 +216,37 @@ def test_short_circuit_on_matching_dtype(self) -> None: # The exact same dask array object should be returned (no map_blocks overhead) assert projected_da is headers_da + + +class TestBuildMdioHeaderType: + """Cases covering the ingestion-side MDIO header type derived from a SegySpec.""" + + def test_ibm32_header_promoted_to_float32(self) -> None: + """An ibm32 header field must be stored as float32, not the raw uint32 slot.""" + spec = _make_segy_spec( + [ + HeaderField(name="inline", byte=189, format="int32"), + HeaderField(name="elevation", byte=193, format="ibm32"), + ] + ) + + header_type = build_mdio_header_type(spec) + formats = {field.name: field.format for field in header_type.fields} + + assert formats["inline"] == MdioScalarType.INT32 + assert formats["elevation"] == MdioScalarType.FLOAT32 + + def test_non_ibm32_spec_unchanged(self) -> None: + """Specs without ibm32 fields keep their original scalar types.""" + spec = _make_segy_spec( + [ + HeaderField(name="inline", byte=189, format="int32"), + HeaderField(name="crossline", byte=193, format="int16"), + ] + ) + + header_type = build_mdio_header_type(spec) + formats = {field.name: field.format for field in header_type.fields} + + assert formats == {"inline": MdioScalarType.INT32, "crossline": MdioScalarType.INT16} + assert ibm32_header_field_names(spec) == set() diff --git a/uv.lock b/uv.lock index a00c2392..8b8a7a25 100644 --- a/uv.lock +++ b/uv.lock @@ -1827,7 +1827,7 @@ requires-dist = [ { name = "pydantic-settings", specifier = ">=2.6.1" }, { name = "rich", specifier = ">=14.1.0" }, { name = "s3fs", marker = "extra == 'cloud'", specifier = ">=2025.9.0" }, - { name = "segy", specifier = ">=0.5.4" }, + { name = "segy", specifier = ">=0.6.0" }, { name = "tqdm", specifier = ">=4.67.1" }, { name = "universal-pathlib", specifier = ">=0.3.3" }, { name = "xarray", specifier = ">=2025.10.1" }, @@ -2932,7 +2932,7 @@ wheels = [ [[package]] name = "segy" -version = "0.5.4" +version = "0.6.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "fsspec" }, @@ -2944,9 +2944,9 @@ dependencies = [ { name = "rapidfuzz" }, { name = "typer" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/54/66/19a2a8872a2083dd3dd450893f34c398700c81f12d86767870b252cb60a1/segy-0.5.4.tar.gz", hash = "sha256:e92c34951f4b5d379145d65fd58d97781e281c505d629b0cfeb8e74b2a614b62", size = 45977, upload-time = "2026-02-12T17:44:48.755Z" } +sdist = { url = "https://files.pythonhosted.org/packages/e5/be/39961f0053378ecd2ab42fdffc7d76f8e79ac00f79bbf6ebf31183e9e0f0/segy-0.6.0.tar.gz", hash = "sha256:1ec623361e99ddbf952be286403912e4c50218f49bdddf73a0c907a9a06a74f7", size = 46333, upload-time = "2026-06-16T15:25:23.091Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/db/d2/917372ed5e1b81f4871d4fd7a693ff21e2887f3060b83cb4d86ae4446fdf/segy-0.5.4-py3-none-any.whl", hash = "sha256:a76da693ebfdea76f9ea110d98951012a10cc1d86dd6b2d01ab877333dbeee8a", size = 58324, upload-time = "2026-02-12T17:44:47.71Z" }, + { url = "https://files.pythonhosted.org/packages/ec/ab/c15b6ca03f5e3e8765dbcc477e8963263cf79ea171ed78a193ca5fc8bc4d/segy-0.6.0-py3-none-any.whl", hash = "sha256:82a3c772434ff7a12e0ef371a288362411d90798377c17946e66737a1791f473", size = 58689, upload-time = "2026-06-16T15:25:22.132Z" }, ] [[package]]