From 939149f9136bae9ebd86cd7105df8580ec7823cc Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Tue, 5 May 2026 09:53:29 -0700 Subject: [PATCH 1/2] refactor(pyamber): flatten over-nested util packages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Several `core/util//.py` directories existed only to host a single file — a Java/Scala "one class per directory" convention transplanted into Python where it adds noise without value. Two related anti-patterns existed alongside it: - `core/util/expression_evaluator/__init__.py` was 211 lines of evaluator implementation (the package had no other modules). - `core/util/virtual_identity/__init__.py` was 99 lines of helper functions. `__init__.py` should re-export, not host the implementation. Changes: - Flatten single-file directories to flat modules: `runnable/runnable.py`, `thread/atomic.py`, `protocol/base_protocols.py` → `runnable.py`, `atomic.py`, `base_protocols.py`. - Delete `core/util/operator/` — `__init__.py` was a 16-line license header with no exports and no importers. - Move `expression_evaluator/__init__.py` and `virtual_identity/__init__.py` implementations into flat single-file modules (the dirs only ever held the impl + the test file). - Move tests up alongside the new flat modules. - Replace the hardcoded `core.util.expression_evaluator.test_expression_evaluator` module path inside `test_evaluate_object_expression` with `A.__module__` so the test tracks whatever import path it resolves to (it would have broken on this rename otherwise). - Inside `core/util/stoppable/stoppable_queue_blocking_thread.py`, switch the intra-package `from core.util.stoppable import Stoppable` back to the relative `from .stoppable import Stoppable` to avoid hitting the parent package's __init__ mid-initialization. Closes #4951 --- amber/src/main/python/core/python_worker.py | 4 ++-- .../main/python/core/runnables/data_processor.py | 2 +- .../src/main/python/core/runnables/heartbeat.py | 4 ++-- .../python/core/runnables/network_receiver.py | 2 +- ...input_port_materialization_reader_runnable.py | 2 +- .../main/python/core/util/{thread => }/atomic.py | 0 .../core/util/{protocol => }/base_protocols.py | 0 .../main/python/core/util/buffer/buffer_base.py | 2 +- .../linked_blocking_multi_queue.py | 2 +- .../core/util/customized_queue/queue_base.py | 2 +- .../__init__.py => expression_evaluator.py} | 0 .../main/python/core/util/operator/__init__.py | 16 ---------------- .../python/core/util/{runnable => }/runnable.py | 0 .../stoppable/stoppable_queue_blocking_thread.py | 4 ++-- .../src/main/python/core/util/thread/__init__.py | 16 ---------------- .../__init__.py => virtual_identity.py} | 0 .../python/core/util/{thread => }/test_atomic.py | 2 +- .../test_expression_evaluator.py | 0 .../test_virtual_identity.py | 0 19 files changed, 13 insertions(+), 45 deletions(-) rename amber/src/main/python/core/util/{thread => }/atomic.py (100%) rename amber/src/main/python/core/util/{protocol => }/base_protocols.py (100%) rename amber/src/main/python/core/util/{expression_evaluator/__init__.py => expression_evaluator.py} (100%) delete mode 100644 amber/src/main/python/core/util/operator/__init__.py rename amber/src/main/python/core/util/{runnable => }/runnable.py (100%) delete mode 100644 amber/src/main/python/core/util/thread/__init__.py rename amber/src/main/python/core/util/{virtual_identity/__init__.py => virtual_identity.py} (100%) rename amber/src/test/python/core/util/{thread => }/test_atomic.py (99%) rename amber/src/test/python/core/util/{expression_evaluator => }/test_expression_evaluator.py (100%) rename amber/src/test/python/core/util/{virtual_identity => }/test_virtual_identity.py (100%) diff --git a/amber/src/main/python/core/python_worker.py b/amber/src/main/python/core/python_worker.py index bcd0652d596..d1467de6dbe 100644 --- a/amber/src/main/python/core/python_worker.py +++ b/amber/src/main/python/core/python_worker.py @@ -20,8 +20,8 @@ from core.models.internal_queue import InternalQueue from core.runnables import MainLoop, NetworkReceiver, NetworkSender, Heartbeat -from core.util.runnable.runnable import Runnable -from core.util.stoppable.stoppable import Stoppable +from core.util.runnable import Runnable +from core.util.stoppable import Stoppable class PythonWorker(Runnable, Stoppable): diff --git a/amber/src/main/python/core/runnables/data_processor.py b/amber/src/main/python/core/runnables/data_processor.py index 089a162228a..3998a3ff9ad 100644 --- a/amber/src/main/python/core/runnables/data_processor.py +++ b/amber/src/main/python/core/runnables/data_processor.py @@ -30,7 +30,7 @@ from core.util import Stoppable from core.util.console_message.replace_print import replace_print from core.util.console_message.timestamp import current_time_in_local_timezone -from core.util.runnable.runnable import Runnable +from core.util.runnable import Runnable from proto.org.apache.texera.amber.engine.architecture.rpc import ( ConsoleMessage, ConsoleMessageType, diff --git a/amber/src/main/python/core/runnables/heartbeat.py b/amber/src/main/python/core/runnables/heartbeat.py index 9199518f3f4..1e4c45837d4 100644 --- a/amber/src/main/python/core/runnables/heartbeat.py +++ b/amber/src/main/python/core/runnables/heartbeat.py @@ -24,8 +24,8 @@ from overrides import overrides from threading import Event -from core.util.runnable.runnable import Runnable -from core.util.stoppable.stoppable import Stoppable +from core.util.runnable import Runnable +from core.util.stoppable import Stoppable class Heartbeat(Runnable, Stoppable): diff --git a/amber/src/main/python/core/runnables/network_receiver.py b/amber/src/main/python/core/runnables/network_receiver.py index 659cd65c78d..8ba4fbe1472 100644 --- a/amber/src/main/python/core/runnables/network_receiver.py +++ b/amber/src/main/python/core/runnables/network_receiver.py @@ -43,7 +43,7 @@ ) from core.proxy import ProxyServer from core.util import Stoppable, get_one_of -from core.util.runnable.runnable import Runnable +from core.util.runnable import Runnable from proto.org.apache.texera.amber.engine.architecture.rpc import EmbeddedControlMessage from proto.org.apache.texera.amber.engine.common import ( PythonControlMessage, diff --git a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py index e49c0316cc7..6122bbb8b98 100644 --- a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py +++ b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py @@ -38,7 +38,7 @@ from core.models.internal_queue import DataElement, ECMElement from core.storage.document_factory import DocumentFactory from core.util import Stoppable, get_one_of -from core.util.runnable.runnable import Runnable +from core.util.runnable import Runnable from core.util.virtual_identity import get_from_actor_id_for_input_port_storage from proto.org.apache.texera.amber.core import ( ActorVirtualIdentity, diff --git a/amber/src/main/python/core/util/thread/atomic.py b/amber/src/main/python/core/util/atomic.py similarity index 100% rename from amber/src/main/python/core/util/thread/atomic.py rename to amber/src/main/python/core/util/atomic.py diff --git a/amber/src/main/python/core/util/protocol/base_protocols.py b/amber/src/main/python/core/util/base_protocols.py similarity index 100% rename from amber/src/main/python/core/util/protocol/base_protocols.py rename to amber/src/main/python/core/util/base_protocols.py diff --git a/amber/src/main/python/core/util/buffer/buffer_base.py b/amber/src/main/python/core/util/buffer/buffer_base.py index 244b7357407..10a44d28aae 100644 --- a/amber/src/main/python/core/util/buffer/buffer_base.py +++ b/amber/src/main/python/core/util/buffer/buffer_base.py @@ -17,7 +17,7 @@ from abc import ABCMeta -from core.util.protocol.base_protocols import FlushedGetable, Putable +from core.util.base_protocols import FlushedGetable, Putable class IBuffer(FlushedGetable, Putable, metaclass=ABCMeta): diff --git a/amber/src/main/python/core/util/customized_queue/linked_blocking_multi_queue.py b/amber/src/main/python/core/util/customized_queue/linked_blocking_multi_queue.py index 3b46e6db4d7..735f0f6dc0d 100644 --- a/amber/src/main/python/core/util/customized_queue/linked_blocking_multi_queue.py +++ b/amber/src/main/python/core/util/customized_queue/linked_blocking_multi_queue.py @@ -23,7 +23,7 @@ from core.util.customized_queue.inner import inner from core.util.customized_queue.queue_base import IKeyedQueue -from core.util.thread.atomic import AtomicInteger +from core.util.atomic import AtomicInteger K = TypeVar("K") T = TypeVar("T") diff --git a/amber/src/main/python/core/util/customized_queue/queue_base.py b/amber/src/main/python/core/util/customized_queue/queue_base.py index 47b8aac94e4..4ee96312cba 100644 --- a/amber/src/main/python/core/util/customized_queue/queue_base.py +++ b/amber/src/main/python/core/util/customized_queue/queue_base.py @@ -18,7 +18,7 @@ from abc import ABCMeta from dataclasses import dataclass -from core.util.protocol.base_protocols import ( +from core.util.base_protocols import ( Putable, Getable, EmtpyCheckable, diff --git a/amber/src/main/python/core/util/expression_evaluator/__init__.py b/amber/src/main/python/core/util/expression_evaluator.py similarity index 100% rename from amber/src/main/python/core/util/expression_evaluator/__init__.py rename to amber/src/main/python/core/util/expression_evaluator.py diff --git a/amber/src/main/python/core/util/operator/__init__.py b/amber/src/main/python/core/util/operator/__init__.py deleted file mode 100644 index 13a83393a91..00000000000 --- a/amber/src/main/python/core/util/operator/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/amber/src/main/python/core/util/runnable/runnable.py b/amber/src/main/python/core/util/runnable.py similarity index 100% rename from amber/src/main/python/core/util/runnable/runnable.py rename to amber/src/main/python/core/util/runnable.py diff --git a/amber/src/main/python/core/util/stoppable/stoppable_queue_blocking_thread.py b/amber/src/main/python/core/util/stoppable/stoppable_queue_blocking_thread.py index d20073631b3..992ad596fe0 100644 --- a/amber/src/main/python/core/util/stoppable/stoppable_queue_blocking_thread.py +++ b/amber/src/main/python/core/util/stoppable/stoppable_queue_blocking_thread.py @@ -19,8 +19,8 @@ from overrides import overrides from core.util.customized_queue.queue_base import IQueue, QueueControl, QueueElement -from core.util.runnable.runnable import Runnable -from core.util.stoppable.stoppable import Stoppable +from core.util.runnable import Runnable +from .stoppable import Stoppable class StoppableQueueBlockingRunnable(Runnable, Stoppable): diff --git a/amber/src/main/python/core/util/thread/__init__.py b/amber/src/main/python/core/util/thread/__init__.py deleted file mode 100644 index 13a83393a91..00000000000 --- a/amber/src/main/python/core/util/thread/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/amber/src/main/python/core/util/virtual_identity/__init__.py b/amber/src/main/python/core/util/virtual_identity.py similarity index 100% rename from amber/src/main/python/core/util/virtual_identity/__init__.py rename to amber/src/main/python/core/util/virtual_identity.py diff --git a/amber/src/test/python/core/util/thread/test_atomic.py b/amber/src/test/python/core/util/test_atomic.py similarity index 99% rename from amber/src/test/python/core/util/thread/test_atomic.py rename to amber/src/test/python/core/util/test_atomic.py index fa6238e0eb2..0f6b393233c 100644 --- a/amber/src/test/python/core/util/thread/test_atomic.py +++ b/amber/src/test/python/core/util/test_atomic.py @@ -19,7 +19,7 @@ import pytest -from core.util.thread.atomic import AtomicInteger +from core.util.atomic import AtomicInteger class TestAtomicIntegerSingleThreaded: diff --git a/amber/src/test/python/core/util/expression_evaluator/test_expression_evaluator.py b/amber/src/test/python/core/util/test_expression_evaluator.py similarity index 100% rename from amber/src/test/python/core/util/expression_evaluator/test_expression_evaluator.py rename to amber/src/test/python/core/util/test_expression_evaluator.py diff --git a/amber/src/test/python/core/util/virtual_identity/test_virtual_identity.py b/amber/src/test/python/core/util/test_virtual_identity.py similarity index 100% rename from amber/src/test/python/core/util/virtual_identity/test_virtual_identity.py rename to amber/src/test/python/core/util/test_virtual_identity.py From ded866114c67da39e62ee4266ffae16478046434 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Tue, 5 May 2026 23:48:32 -0700 Subject: [PATCH 2/2] test(pyamber): cover PythonWorker construction, stop cascade, and run drain The util-flatten rename touched `core/python_worker.py`'s import lines, and codecov flagged them because the file had no test coverage at all. Add a focused unit test that stubs `NetworkReceiver`, `NetworkSender`, `MainLoop`, and `Heartbeat`, then exercises: - construction wires the receiver's proxy port into the sender, and the receiver's shutdown callback is bound to `worker.stop` - `stop()` cascades to main loop, sender, and heartbeat - `run()` sets the heartbeat stop event after main loop returns Brings `core/python_worker.py` to 100% line coverage. --- .../test/python/core/test_python_worker.py | 125 ++++++++++++++++++ 1 file changed, 125 insertions(+) create mode 100644 amber/src/test/python/core/test_python_worker.py diff --git a/amber/src/test/python/core/test_python_worker.py b/amber/src/test/python/core/test_python_worker.py new file mode 100644 index 00000000000..25658fc78b7 --- /dev/null +++ b/amber/src/test/python/core/test_python_worker.py @@ -0,0 +1,125 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pytest + +import core.python_worker as pw + + +class _FakeReceiver: + def __init__(self, input_queue, host): + self.input_queue = input_queue + self.host = host + self.proxy_server = type( + "FakeProxyServer", (), {"get_port_number": staticmethod(lambda: 12345)} + )() + self._shutdown_cb = None + + def register_shutdown(self, cb): + self._shutdown_cb = cb + + def run(self): + pass + + def stop(self): + pass + + +class _FakeSender: + def __init__(self, output_queue, host, port, handshake_port): + self.output_queue = output_queue + self.host = host + self.port = port + self.handshake_port = handshake_port + self.stopped = False + + def run(self): + pass + + def stop(self): + self.stopped = True + + +class _FakeMainLoop: + def __init__(self, worker_id, input_queue, output_queue): + self.worker_id = worker_id + self.stopped = False + + def run(self): + pass + + def stop(self): + self.stopped = True + + +class _FakeHeartbeat: + def __init__(self, host, port, interval, stop_event): + self.host = host + self.port = port + self.interval = interval + self.stop_event = stop_event + self.stopped = False + + def run(self): + pass + + def stop(self): + self.stopped = True + + +@pytest.fixture +def stub_network(monkeypatch): + monkeypatch.setattr(pw, "NetworkReceiver", _FakeReceiver) + monkeypatch.setattr(pw, "NetworkSender", _FakeSender) + monkeypatch.setattr(pw, "MainLoop", _FakeMainLoop) + monkeypatch.setattr(pw, "Heartbeat", _FakeHeartbeat) + + +class TestPythonWorker: + @pytest.mark.timeout(5) + def test_construction_wires_dependencies(self, stub_network): + worker = pw.PythonWorker(worker_id="w-1", host="localhost", output_port=9999) + + # NetworkSender must receive the handshake port from the receiver's + # proxy server — this is the Java→Python wiring contract. + assert worker._network_sender.handshake_port == 12345 + assert worker._network_sender.port == 9999 + # The receiver's shutdown callback is wired to worker.stop so a + # client-side disconnect tears the worker down. + assert worker._network_receiver._shutdown_cb == worker.stop + + @pytest.mark.timeout(5) + def test_stop_cascades_to_main_loop_sender_and_heartbeat(self, stub_network): + worker = pw.PythonWorker(worker_id="w-1", host="localhost", output_port=9999) + + worker.stop() + + assert worker._main_loop.stopped is True + assert worker._network_sender.stopped is True + assert worker._heartbeat.stopped is True + + @pytest.mark.timeout(5) + def test_run_sets_stop_event_after_main_loop_returns(self, stub_network): + worker = pw.PythonWorker(worker_id="w-1", host="localhost", output_port=9999) + + # All fakes' run() return immediately, so run() drains all threads + # without blocking. The contract is that the heartbeat stop event + # is set after the main loop / sender threads join, so the + # heartbeat thread can exit cleanly. + worker.run() + + assert worker._stop_event.is_set()