Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
581d574
feat(amber): materialize per-port state to Iceberg storage
aglinxinyuan May 5, 2026
b90ffca
fix
aglinxinyuan May 5, 2026
885846f
fix
aglinxinyuan May 5, 2026
74fda4f
fix
aglinxinyuan May 5, 2026
04ccda6
fix
aglinxinyuan May 5, 2026
a6221a7
Merge branch 'main' into xinyuan-state-materialization
aglinxinyuan May 5, 2026
1fc823f
fix
aglinxinyuan May 6, 2026
ffac23a
Merge branch 'main' into xinyuan-state-materialization
aglinxinyuan May 6, 2026
1e77da7
fix fmt
aglinxinyuan May 6, 2026
97e78b9
Merge branch 'main' into xinyuan-state-materialization
aglinxinyuan May 6, 2026
06ef4f8
fix fmt
aglinxinyuan May 6, 2026
e2f5ea1
fix fmt
aglinxinyuan May 6, 2026
6d7d88e
fix fmt
aglinxinyuan May 6, 2026
8e27025
docs: explain state-before-tuple replay order in materialization readers
aglinxinyuan May 6, 2026
df9e360
docs: note state replay broadcasts to every worker
aglinxinyuan May 6, 2026
4428c9d
update
aglinxinyuan May 6, 2026
2722038
docs: note multi-output-port state fan-out is intentional
aglinxinyuan May 6, 2026
fd74c4c
refactor(pyamber): drop emit_state_with_filter from materialization r…
aglinxinyuan May 6, 2026
1cef2ff
fix fmt
aglinxinyuan May 6, 2026
4223cf5
refactor(storage): make port URIs symmetric over a base URI
aglinxinyuan May 7, 2026
d3bb766
Merge branch 'main' into xinyuan-state-materialization
aglinxinyuan May 7, 2026
3f586de
Merge branch 'main' into xinyuan-state-materialization
Xiao-zhen-Liu May 7, 2026
990acdf
fix(amber): rethrow state writer failures on close
aglinxinyuan May 7, 2026
7da78ed
refactor(amber): rename OutputPortResultWriterThread → OutputPortStor…
aglinxinyuan May 8, 2026
c1b6ae0
chore: remove accidentally-committed .claude/ artifacts
aglinxinyuan May 8, 2026
6f26a5a
Merge branch 'main' into xinyuan-state-materialization
aglinxinyuan May 8, 2026
7fd8e3e
refactor: rename OutputPortConfig.storageURI to storageURIBase
aglinxinyuan May 8, 2026
8d8abfd
Potential fix for pull request finding
aglinxinyuan May 8, 2026
98b1e04
Merge branch 'main' into xinyuan-state-materialization
aglinxinyuan May 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ async def assign_port(self, req: AssignPortRequest) -> EmptyReturn:
channel_id=channel_id, port_id=req.port_id
)
else:
storage_uri = None
storage_uri_base = None
if len(req.storage_uris) > 0 and req.storage_uris[0]:
storage_uri = req.storage_uris[0]
storage_uri_base = req.storage_uris[0]
self.context.output_manager.add_output_port(
req.port_id, Schema(raw_schema=req.schema), storage_uri
req.port_id, Schema(raw_schema=req.schema), storage_uri_base
)
return EmptyReturn()
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from core.models.payload import DataPayload, DataFrame
from core.models.state import State
from core.storage.document_factory import DocumentFactory
from core.storage.vfs_uri_factory import VFSURIFactory
from core.storage.runnables.port_storage_writer import (
PortStorageWriter,
PortStorageWriterElement,
Expand Down Expand Up @@ -87,6 +88,10 @@ def __init__(self, worker_id: str):
PortIdentity, typing.Tuple[Queue, PortStorageWriter, Thread]
] = dict()

self._port_state_writers: typing.Dict[
PortIdentity, typing.Tuple[Queue, PortStorageWriter, Thread]
] = dict()

def is_missing_output_ports(self):
"""
This method is only used for ensuring correct region execution.
Expand All @@ -107,26 +112,30 @@ def add_output_port(
self,
port_id: PortIdentity,
schema: Schema,
storage_uri: typing.Optional[str] = None,
storage_uri_base: typing.Optional[str] = None,
) -> None:
if port_id.id is None:
port_id.id = 0
if port_id.internal is None:
port_id.internal = False

if storage_uri is not None:
self.set_up_port_storage_writer(port_id, storage_uri)
if storage_uri_base is not None:
self.set_up_port_storage_writer(port_id, storage_uri_base)

# each port can only be added and initialized once.
if port_id not in self._ports:
self._ports[port_id] = WorkerPort(schema)

def set_up_port_storage_writer(self, port_id: PortIdentity, storage_uri: str):
def set_up_port_storage_writer(self, port_id: PortIdentity, storage_uri_base: str):
"""
Create a separate thread for saving output tuples of a port
to storage in batch.
to storage in batch, and open a long-lived buffered writer for
state materialization on the same port. `storage_uri_base` is the
port's base URI; the result and state URIs are derived from it.
"""
document, _ = DocumentFactory.open_document(storage_uri)
document, _ = DocumentFactory.open_document(
VFSURIFactory.result_uri(storage_uri_base)
)
buffered_item_writer = document.writer(str(get_worker_index(self.worker_id)))
writer_queue = Queue()
port_storage_writer = PortStorageWriter(
Expand All @@ -144,6 +153,29 @@ def set_up_port_storage_writer(self, port_id: PortIdentity, storage_uri: str):
writer_thread,
)

state_document, _ = DocumentFactory.open_document(
VFSURIFactory.state_uri(storage_uri_base)
)
state_buffered_item_writer = state_document.writer(
str(get_worker_index(self.worker_id))
)
state_writer_queue = Queue()
state_port_writer = PortStorageWriter(
buffered_item_writer=state_buffered_item_writer,
queue=state_writer_queue,
)
state_writer_thread = threading.Thread(
target=state_port_writer.run,
daemon=True,
name=f"port_state_writer_thread_{port_id}",
)
state_writer_thread.start()
self._port_state_writers[port_id] = (
state_writer_queue,
state_port_writer,
state_writer_thread,
)

def get_port(self, port_id=None) -> WorkerPort:
return list(self._ports.values())[0]

Expand Down Expand Up @@ -171,6 +203,20 @@ def save_tuple_to_storage_if_needed(self, tuple_: Tuple, port_id=None) -> None:
PortStorageWriterElement(data_tuple=tuple_)
)

def save_state_to_storage_if_needed(self, state: State, port_id=None) -> None:
# When port_id is omitted the same state row is fanned out to
# every output port's state table. This mirrors the
# broadcast-to-all-workers behavior on the emit side: state is
# shared context, not per-key data, so every downstream operator
# (and every worker reading the materialization) needs the full
# set.
element = PortStorageWriterElement(data_tuple=state.to_tuple())
if port_id is None:
for writer_queue, _, _ in self._port_state_writers.values():
writer_queue.put(element)
elif port_id in self._port_state_writers:
self._port_state_writers[port_id][0].put(element)

Comment thread
aglinxinyuan marked this conversation as resolved.
def close_port_storage_writers(self) -> None:
"""
Flush the buffers of port storage writers and wait for all the
Expand All @@ -184,6 +230,11 @@ def close_port_storage_writers(self) -> None:
for _, _, writer_thread in self._port_storage_writers.values():
# This blocking call will wait for all the writer to finish commit
writer_thread.join()
for _, state_writer, _ in self._port_state_writers.values():
state_writer.stop()
for _, _, state_writer_thread in self._port_state_writers.values():
state_writer_thread.join()
self._port_state_writers.clear()

def add_partitioning(self, tag: PhysicalLink, partitioning: Partitioning) -> None:
"""
Expand Down
1 change: 1 addition & 0 deletions amber/src/main/python/core/runnables/main_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ def process_input_state(self) -> None:
payload=batch,
)
)
self.context.output_manager.save_state_to_storage_if_needed(output_state)

def process_tuple_with_udf(self) -> Iterator[Optional[Tuple]]:
"""
Expand Down
107 changes: 59 additions & 48 deletions amber/src/main/python/core/storage/document_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,30 +61,35 @@ def create_document(uri: str, schema: Schema) -> VirtualDocument:
if parsed_uri.scheme == VFSURIFactory.VFS_FILE_URI_SCHEME:
_, _, _, resource_type = VFSURIFactory.decode_uri(uri)

if resource_type in {VFSResourceType.RESULT}:
storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)

# Convert Amber Schema to Iceberg Schema with LARGE_BINARY
# field name encoding
iceberg_schema = amber_schema_to_iceberg_schema(schema)

create_table(
IcebergCatalogInstance.get_instance(),
StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE,
storage_key,
iceberg_schema,
override_if_exists=True,
)

return IcebergDocument[Tuple](
StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE,
storage_key,
iceberg_schema,
amber_tuples_to_arrow_table,
arrow_table_to_amber_tuples,
)
else:
raise ValueError(f"Resource type {resource_type} is not supported")
match resource_type:
case VFSResourceType.RESULT:
namespace = StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE
case VFSResourceType.STATE:
namespace = StorageConfig.ICEBERG_TABLE_STATE_NAMESPACE
case _:
raise ValueError(f"Resource type {resource_type} is not supported")

storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)
# Convert Amber Schema to Iceberg Schema with LARGE_BINARY
# field name encoding
iceberg_schema = amber_schema_to_iceberg_schema(schema)

create_table(
IcebergCatalogInstance.get_instance(),
namespace,
storage_key,
iceberg_schema,
override_if_exists=True,
)

return IcebergDocument[Tuple](
namespace,
storage_key,
iceberg_schema,
amber_tuples_to_arrow_table,
arrow_table_to_amber_tuples,
)

else:
raise NotImplementedError(
f"Unsupported URI scheme: {parsed_uri.scheme} for creating the document"
Expand All @@ -96,30 +101,36 @@ def open_document(uri: str) -> typing.Tuple[VirtualDocument, Optional[Schema]]:
if parsed_uri.scheme == "vfs":
_, _, _, resource_type = VFSURIFactory.decode_uri(uri)

if resource_type in {VFSResourceType.RESULT}:
storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)

table = load_table_metadata(
IcebergCatalogInstance.get_instance(),
StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE,
storage_key,
)

if table is None:
raise ValueError("No storage is found for the given URI")

amber_schema = Schema(table.schema().as_arrow())

document = IcebergDocument(
StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE,
storage_key,
table.schema(),
amber_tuples_to_arrow_table,
arrow_table_to_amber_tuples,
)
return document, amber_schema
else:
raise ValueError(f"Resource type {resource_type} is not supported")
match resource_type:
case VFSResourceType.RESULT:
namespace = StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE
case VFSResourceType.STATE:
namespace = StorageConfig.ICEBERG_TABLE_STATE_NAMESPACE
case _:
raise ValueError(f"Resource type {resource_type} is not supported")

storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)

table = load_table_metadata(
IcebergCatalogInstance.get_instance(),
namespace,
storage_key,
)

if table is None:
raise ValueError("No storage is found for the given URI")

amber_schema = Schema(table.schema().as_arrow())

document = IcebergDocument(
namespace,
storage_key,
table.schema(),
amber_tuples_to_arrow_table,
arrow_table_to_amber_tuples,
)
return document, amber_schema

else:
raise NotImplementedError(
f"Unsupported URI scheme: {parsed_uri.scheme} for opening the document"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@
from core.architecture.sendsemantics.round_robin_partitioner import (
RoundRobinPartitioner,
)
from core.models import Tuple, InternalQueue, DataFrame, DataPayload
from core.models import Tuple, InternalQueue, DataFrame, DataPayload, State, StateFrame
from core.models.internal_queue import DataElement, ECMElement
from core.storage.document_factory import DocumentFactory
from core.storage.vfs_uri_factory import VFSURIFactory
from core.util import Stoppable, get_one_of
from core.util.runnable import Runnable
from core.util.virtual_identity import get_from_actor_id_for_input_port_storage
Expand Down Expand Up @@ -132,14 +133,28 @@ def run(self) -> None:
emits an EndChannel ECM. Use the same partitioner implementation as that in
output manager, where a tuple is batched by the partitioner and only
selected as the input of this worker according to the partitioner.

States and tuples are persisted to separate tables, so the original
interleaving is lost and replay has to pick an order: we replay states
first because downstream operators typically need their state set up
before they process the incoming tuples. Every state is broadcast to
every downstream worker -- no partitioner filtering, unlike the tuple
loop. State is shared context (e.g. config / counters), not per-key
data, so each worker needs the full set.
"""
try:
self.materialization, self.tuple_schema = DocumentFactory.open_document(
self.uri
VFSURIFactory.result_uri(self.uri)
)
self.emit_ecm("StartChannel", EmbeddedControlMessageType.NO_ALIGNMENT)
storage_iterator = self.materialization.get()

state_document, _ = DocumentFactory.open_document(
Comment thread
aglinxinyuan marked this conversation as resolved.
VFSURIFactory.state_uri(self.uri)
)
for state_row in state_document.get():
self.emit_payload(StateFrame(State.from_tuple(state_row)))

storage_iterator = self.materialization.get()
# Iterate and process tuples.
for tup in storage_iterator:
if self._stopped:
Expand Down
3 changes: 3 additions & 0 deletions amber/src/main/python/core/storage/storage_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class StorageConfig:
ICEBERG_REST_CATALOG_URI = None
ICEBERG_REST_CATALOG_WAREHOUSE_NAME = None
ICEBERG_TABLE_RESULT_NAMESPACE = None
ICEBERG_TABLE_STATE_NAMESPACE = None
ICEBERG_FILE_STORAGE_DIRECTORY_PATH = None
ICEBERG_TABLE_COMMIT_BATCH_SIZE = None

Expand All @@ -51,6 +52,7 @@ def initialize(
rest_catalog_uri,
rest_catalog_warehouse_name,
table_result_namespace,
table_state_namespace,
directory_path,
commit_batch_size,
s3_endpoint,
Expand All @@ -71,6 +73,7 @@ def initialize(
cls.ICEBERG_REST_CATALOG_WAREHOUSE_NAME = rest_catalog_warehouse_name

cls.ICEBERG_TABLE_RESULT_NAMESPACE = table_result_namespace
cls.ICEBERG_TABLE_STATE_NAMESPACE = table_state_namespace
cls.ICEBERG_FILE_STORAGE_DIRECTORY_PATH = directory_path
cls.ICEBERG_TABLE_COMMIT_BATCH_SIZE = int(commit_batch_size)

Expand Down
17 changes: 14 additions & 3 deletions amber/src/main/python/core/storage/vfs_uri_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class VFSResourceType(str, Enum):
RESULT = "result"
RUNTIME_STATISTICS = "runtimeStatistics"
CONSOLE_MESSAGES = "consoleMessages"
STATE = "state"


class VFSURIFactory:
Expand Down Expand Up @@ -88,12 +89,22 @@ def extract_value(key: str) -> str:
)

@staticmethod
def create_result_uri(workflow_id, execution_id, global_port_id) -> str:
"""Creates a URI pointing to a result storage."""
base_uri = (
def create_port_base_uri(workflow_id, execution_id, global_port_id) -> str:
"""Base URI for a port. Result and state URIs derive from it via
`result_uri` / `state_uri`.
"""
return (
f"{VFSURIFactory.VFS_FILE_URI_SCHEME}:///wid/{workflow_id.id}"
f"/eid/{execution_id.id}/globalportid/"
f"{serialize_global_port_identity(global_port_id)}"
)

@staticmethod
def result_uri(base_uri: str) -> str:
"""The result-resource URI under a port base URI."""
return f"{base_uri}/{VFSResourceType.RESULT.value}"

@staticmethod
def state_uri(base_uri: str) -> str:
"""The state-resource URI under a port base URI."""
return f"{base_uri}/{VFSResourceType.STATE.value}"
2 changes: 2 additions & 0 deletions amber/src/main/python/texera_run_python_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def init_loguru_logger(stream_log_level) -> None:
iceberg_rest_catalog_uri,
iceberg_rest_catalog_warehouse_name,
iceberg_table_namespace,
iceberg_table_state_namespace,
iceberg_file_storage_directory_path,
iceberg_table_commit_batch_size,
s3_endpoint,
Expand All @@ -68,6 +69,7 @@ def init_loguru_logger(stream_log_level) -> None:
iceberg_rest_catalog_uri,
iceberg_rest_catalog_warehouse_name,
iceberg_table_namespace,
iceberg_table_state_namespace,
iceberg_file_storage_directory_path,
iceberg_table_commit_batch_size,
s3_endpoint,
Expand Down
Loading
Loading