Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ dependencies = [
"pydantic>=2.12.0",
"pydantic-settings>=2.6.1",
"rich>=14.1.0",
"segy>=0.5.4",
"segy>=0.6.0",
"tqdm>=4.67.1",
"universal-pathlib>=0.3.3",
"xarray>=2025.10.1",
Expand Down
4 changes: 2 additions & 2 deletions src/mdio/ingestion/segy/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

from mdio.api.io import _normalize_path
from mdio.converters.exceptions import GridTraceCountError
from mdio.converters.type_converter import to_structured_type
from mdio.core.grid import Grid
from mdio.ingestion.dataset_factory import build_mdio_dataset
from mdio.ingestion.grid_qc import grid_density_qc
Expand All @@ -25,6 +24,7 @@
from mdio.ingestion.segy.validation import validate_spec_in_template
from mdio.segy.file import get_segy_file_info
from mdio.segy.geometry import validate_overrides_for_template
from mdio.segy.utilities import build_mdio_header_type

if TYPE_CHECKING:
from pathlib import Path
Expand Down Expand Up @@ -173,7 +173,7 @@ def segy_to_mdio( # noqa: PLR0913

grid = _build_grid(dimensions, indexed_headers, segy_file_info.num_traces)

header_dtype = to_structured_type(segy_spec.trace.header.dtype)
header_dtype = build_mdio_header_type(segy_spec)
extra_variables = build_raw_header_variables(schema)
mdio_ds = build_mdio_dataset(
schema=schema,
Expand Down
51 changes: 50 additions & 1 deletion src/mdio/segy/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@

import numpy as np
from dask.array.core import normalize_chunks
from segy.schema import ScalarType as SegyScalarType

from mdio.builder.schemas.dtype import ScalarType as MdioScalarType
from mdio.builder.schemas.dtype import StructuredType
from mdio.converters.type_converter import to_numpy_dtype
from mdio.converters.type_converter import to_structured_type

if TYPE_CHECKING:
from dask.array import Array as DaskArray
Expand All @@ -19,6 +25,46 @@
logger = logging.getLogger(__name__)


def ibm32_header_field_names(segy_spec: SegySpec) -> set[str]:
"""Return the names of trace-header fields declared as IBM 32-bit floats.

The segy schema maps an ``ibm32`` header field to a raw ``uint32`` slot (the 4-byte
IBM word) but decodes it to ``float32`` on read. Callers use these names to promote
the affected fields to ``float32`` so the decoded value is stored and projected
without truncating decimals or wrapping the sign of negative values.

Args:
segy_spec: SEG-Y specification whose trace header fields are inspected.

Returns:
Set of header field names whose declared format is ``ibm32``.
"""
return {field.name for field in segy_spec.trace.header.fields if field.format == SegyScalarType.IBM32}


def build_mdio_header_type(segy_spec: SegySpec) -> StructuredType:
"""Build the MDIO ``headers`` variable type from a SegySpec.

``ibm32`` header fields are promoted from their raw ``uint32`` slot to ``float32`` so
the persisted header matches the decoded array the ingestion worker writes. Without
this promotion the decoded float would be cast down to an integer on write, truncating
decimals (``118.625`` -> ``118``) and wrapping signed values (``-50.25`` -> a large
unsigned integer).

Args:
segy_spec: SEG-Y specification describing the trace header layout.

Returns:
The MDIO structured type for the persisted ``headers`` variable.
"""
structured = to_structured_type(segy_spec.trace.header.dtype)
ibm32_names = ibm32_header_field_names(segy_spec)
for field in structured.fields:
if field.name in ibm32_names:
field.format = MdioScalarType.FLOAT32
return structured


def project_headers_to_segy_spec(headers: DaskArray, segy_spec: SegySpec) -> DaskArray:
"""Project stored MDIO trace headers onto the SegySpec trace header layout.

Expand Down Expand Up @@ -49,7 +95,10 @@ def project_headers_to_segy_spec(headers: DaskArray, segy_spec: SegySpec) -> Das
)
raise ValueError(msg)

target_dtype = np.dtype([(name, spec_header_dtype.fields[name][0].newbyteorder("=")) for name in target_names])
# The export target must equal the dtype the headers were stored with at ingest, so route
# through the same builder. That keeps ibm32 fields as float32, which flows into
# SegyFactory.create_traces for IBM encoding instead of re-truncating to the raw uint32 slot.
target_dtype = to_numpy_dtype(build_mdio_header_type(segy_spec))

# Don't actually project if the dtype is already the same as the target dtype.
if headers.dtype == target_dtype:
Expand Down
154 changes: 154 additions & 0 deletions tests/integration/test_segy_ibm32_header_roundtrip.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
"""Round-trip tests for SEG-Y files that declare IBM float (``ibm32``) trace-header fields.

These guard the IBM-real header data path end-to-end through real file I/O:

* Ingestion (Defect A): mdio used to persist an ``ibm32`` header in its raw ``uint32`` slot,
casting the decoded float down to an integer. That truncated decimals (``118.625`` ->
``118``) and wrapped the sign of negatives (``-50.25`` -> a huge unsigned int). The fix
promotes ``ibm32`` header fields to ``float32`` so the decoded value is stored faithfully.
* Export (Defect B): SegyFactory IBM-encodes ``ibm32`` *header* fields from ``segy`` 0.6.0
onward (the minimum mdio requires), so the full round-trip exercises that encode/decode.
"""

from __future__ import annotations

from typing import TYPE_CHECKING

import fsspec
import numpy as np
from segy.factory import SegyFactory
from segy.schema import HeaderField
from segy.standards import get_segy_standard

from mdio import mdio_to_segy
from mdio.api.io import open_mdio
from mdio.builder.template_registry import TemplateRegistry
from mdio.converters.segy import segy_to_mdio
from mdio.segy.file import SegyFileWrapper

if TYPE_CHECKING:
from pathlib import Path

import pytest
from segy.schema import SegySpec

# Dyadic rationals chosen so they are exactly representable in IBM hex float, letting us assert
# exact equality. They cover the original failure modes: decimal truncation and sign wrapping.
IBM_HEADER_VALUES = np.array([118.625, -50.25, 0.5, -1.5], dtype="float32")

GRID_INLINES = 3
GRID_CROSSLINES = 4
NUM_SAMPLES = 8
IBM_FIELD_NAME = "ibm_attr"


def _ibm32_segy_spec() -> SegySpec:
"""Build a rev1 SegySpec with an ``ibm32`` trace-header field alongside index/coord fields."""
fields = [
HeaderField(name="inline", byte=189, format="int32"),
HeaderField(name="crossline", byte=193, format="int32"),
HeaderField(name="coordinate_scalar", byte=71, format="int16"),
HeaderField(name="cdp_x", byte=181, format="int32"),
HeaderField(name="cdp_y", byte=185, format="int32"),
HeaderField(name="samples_per_trace", byte=115, format="int16"),
HeaderField(name="sample_interval", byte=117, format="int16"),
# Unassigned rev1 bytes 233-240: safe spot for a custom IBM-float attribute.
HeaderField(name=IBM_FIELD_NAME, byte=233, format="ibm32"),
]
spec = get_segy_standard(1).customize(trace_header_fields=fields)
spec.segy_standard = 1
return spec


def _ibm_values_per_trace(num_traces: int) -> np.ndarray:
"""Tile the sample IBM values to cover every trace."""
reps = int(np.ceil(num_traces / IBM_HEADER_VALUES.size))
return np.tile(IBM_HEADER_VALUES, reps)[:num_traces].astype("float32")


def _write_ibm32_segy(path: Path, spec: SegySpec) -> np.ndarray:
"""Write a small 3D SEG-Y whose ``ibm32`` header holds real IBM-float values.

``segy`` >= 0.6.0 exposes ``ibm32`` header fields as ``float32`` in the factory template and
IBM-encodes them on write, so real float values are assigned directly.

Args:
path: Destination path for the generated SEG-Y file.
spec: SegySpec describing the trace header layout (must declare the ibm32 field).

Returns:
The IBM header value assigned to each trace, in trace order.
"""
num_traces = GRID_INLINES * GRID_CROSSLINES
factory = SegyFactory(spec=spec, samples_per_trace=NUM_SAMPLES)
samples = factory.create_trace_sample_template(num_traces)
headers = factory.create_trace_header_template(num_traces)

inlines, crosslines = np.mgrid[0:GRID_INLINES, 0:GRID_CROSSLINES]
headers["inline"] = (10 + inlines).ravel()
headers["crossline"] = (100 + crosslines * 2).ravel()
headers["coordinate_scalar"] = -100
headers["cdp_x"] = (700_000 + inlines * 100).ravel()
headers["cdp_y"] = (4_000_000 + crosslines * 100).ravel()
headers["samples_per_trace"] = NUM_SAMPLES
headers["sample_interval"] = 4000

ibm_values = _ibm_values_per_trace(num_traces)
headers[IBM_FIELD_NAME] = ibm_values

samples[:] = (np.arange(num_traces) + 1)[:, None]

with fsspec.open(path.as_posix(), mode="wb") as fp:
fp.write(factory.create_textual_header())
fp.write(factory.create_binary_header())
fp.write(factory.create_traces(headers, samples))

return ibm_values


def test_ingested_ibm32_header_preserves_value(tmp_path: Path) -> None:
"""SEG-Y -> MDIO must store ibm32 headers as float32 without truncating or wrapping (Defect A)."""
spec = _ibm32_segy_spec()
segy_path = tmp_path / "ibm32.sgy"
mdio_path = tmp_path / "ibm32.mdio"

expected = _write_ibm32_segy(segy_path, spec)

segy_to_mdio(
segy_spec=spec,
mdio_template=TemplateRegistry().get("PostStack3DTime"),
input_path=segy_path,
output_path=mdio_path,
overwrite=True,
)

headers = open_mdio(mdio_path)["headers"].values
stored = headers[IBM_FIELD_NAME].ravel()

assert stored.dtype == np.float32
np.testing.assert_array_equal(stored, expected)


def test_ibm32_header_full_roundtrip(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
"""SEG-Y -> MDIO -> SEG-Y must preserve ibm32 header values end-to-end."""
# The SEG-Y file headers must be persisted at ingest so they can be re-emitted on export.
monkeypatch.setenv("MDIO__IMPORT__SAVE_SEGY_FILE_HEADER", "true")

spec = _ibm32_segy_spec()
segy_path = tmp_path / "ibm32.sgy"
mdio_path = tmp_path / "ibm32.mdio"
segy_rt_path = tmp_path / "ibm32_rt.sgy"

expected = _write_ibm32_segy(segy_path, spec)

segy_to_mdio(
segy_spec=spec,
mdio_template=TemplateRegistry().get("PostStack3DTime"),
input_path=segy_path,
output_path=mdio_path,
overwrite=True,
)
mdio_to_segy(segy_spec=spec, input_path=mdio_path, output_path=segy_rt_path)

roundtripped = SegyFileWrapper(segy_rt_path.as_posix(), spec=spec).trace[:].header[IBM_FIELD_NAME]
np.testing.assert_array_equal(roundtripped, expected)
62 changes: 62 additions & 0 deletions tests/unit/test_segy_header_projection.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
from segy.schema import TraceSpec
from segy.standards import get_segy_standard

from mdio.builder.schemas.dtype import ScalarType as MdioScalarType
from mdio.segy.utilities import build_mdio_header_type
from mdio.segy.utilities import ibm32_header_field_names
from mdio.segy.utilities import project_headers_to_segy_spec


Expand Down Expand Up @@ -162,6 +165,31 @@ def test_projection_independent_of_mdio_field_order(self) -> None:
for name in spec.trace.header.names:
np.testing.assert_array_equal(a[name], b[name])

def test_ibm32_field_projected_as_float32_preserves_decimals_and_sign(self) -> None:
"""An ibm32 header stored as float32 must project to float32, not the raw uint32 slot.

Casting to the raw uint32 slot would truncate decimals and wrap negatives, which is the
ingestion/export corruption (Defect A) this guards against.
"""
num = 4
mdio_dtype = np.dtype([("inline", "<i4"), ("elevation", "<f4")])
headers_np = np.zeros(num, dtype=mdio_dtype)
headers_np["inline"] = np.arange(num, dtype=np.int32)
headers_np["elevation"] = np.array([118.625, -50.25, 0.5, -1.5], dtype=np.float32)
headers = da.from_array(headers_np, chunks=2)

spec = _make_segy_spec(
[
HeaderField(name="inline", byte=189, format="int32"),
HeaderField(name="elevation", byte=193, format="ibm32"),
]
)

projected = project_headers_to_segy_spec(headers, spec).compute()

assert projected.dtype.fields["elevation"][0] == np.dtype("float32")
np.testing.assert_array_equal(projected["elevation"], np.array([118.625, -50.25, 0.5, -1.5], dtype=np.float32))

def test_short_circuit_on_matching_dtype(self) -> None:
"""If source headers dtype exactly matches target dtype, return array unchanged."""
spec = _make_segy_spec(
Expand All @@ -188,3 +216,37 @@ def test_short_circuit_on_matching_dtype(self) -> None:

# The exact same dask array object should be returned (no map_blocks overhead)
assert projected_da is headers_da


class TestBuildMdioHeaderType:
"""Cases covering the ingestion-side MDIO header type derived from a SegySpec."""

def test_ibm32_header_promoted_to_float32(self) -> None:
"""An ibm32 header field must be stored as float32, not the raw uint32 slot."""
spec = _make_segy_spec(
[
HeaderField(name="inline", byte=189, format="int32"),
HeaderField(name="elevation", byte=193, format="ibm32"),
]
)

header_type = build_mdio_header_type(spec)
formats = {field.name: field.format for field in header_type.fields}

assert formats["inline"] == MdioScalarType.INT32
assert formats["elevation"] == MdioScalarType.FLOAT32

def test_non_ibm32_spec_unchanged(self) -> None:
"""Specs without ibm32 fields keep their original scalar types."""
spec = _make_segy_spec(
[
HeaderField(name="inline", byte=189, format="int32"),
HeaderField(name="crossline", byte=193, format="int16"),
]
)

header_type = build_mdio_header_type(spec)
formats = {field.name: field.format for field in header_type.fields}

assert formats == {"inline": MdioScalarType.INT32, "crossline": MdioScalarType.INT16}
assert ibm32_header_field_names(spec) == set()
8 changes: 4 additions & 4 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading