From 1e4ec7fd8090a315921610d4fd81648bec259865 Mon Sep 17 00:00:00 2001 From: Yue Chao Qin Date: Tue, 10 Feb 2026 17:52:09 -0500 Subject: [PATCH] feat: GetExecutionNodes API returns number of total and ended nodes --- cloud_pipelines_backend/api_server_sql.py | 22 +++++++++++++++++++--- tests/test_execution_nodes_api_service.py | 22 ++++++++++++++++++++++ 2 files changed, 41 insertions(+), 3 deletions(-) diff --git a/cloud_pipelines_backend/api_server_sql.py b/cloud_pipelines_backend/api_server_sql.py index 47615da..1e018c4 100644 --- a/cloud_pipelines_backend/api_server_sql.py +++ b/cloud_pipelines_backend/api_server_sql.py @@ -519,10 +519,10 @@ class ArtifactNodeIdResponse: id: bts.IdType -@dataclasses.dataclass +@dataclasses.dataclass(kw_only=True) class GetGraphExecutionStateResponse: child_execution_status_stats: dict[bts.IdType, dict[str, int]] - pass + child_execution_status_summary: ExecutionStatusSummary @dataclasses.dataclass(kw_only=True) @@ -697,15 +697,31 @@ def get_graph_execution_state( child_descendants_execution_stat_rows ) + tuple(child_container_execution_stat_rows) child_execution_status_stats: dict[bts.IdType, dict[str, int]] = {} - + total_execution_count = 0 + ended_execution_count = 0 for row in child_execution_stat_rows: + # TODO: Rename this to be _tuple() per version 2.0.19 + # https://docs.sqlalchemy.org/en/20/changelog/changelog_20.html#change-801784234240fc9d4879723c412e74e2 + # + # TODO: If upgrading to SQLAlchemy version 2.1, function tuple() not needed anymore + # https://github.com/sqlalchemy/sqlalchemy/blob/deb949fe05ed8ff0f72f01d53f08f21ba8776aef/lib/sqlalchemy/engine/row.py#L76 child_execution_id, status, count = row.tuple() status_stats = child_execution_status_stats.setdefault( child_execution_id, {} ) status_stats[status.value] = count + total_execution_count += count + if status in bts.CONTAINER_STATUSES_ENDED: + ended_execution_count += count + + summary = ExecutionStatusSummary( + total_executions=total_execution_count, + ended_executions=ended_execution_count, + has_ended=(ended_execution_count == total_execution_count), + ) return GetGraphExecutionStateResponse( child_execution_status_stats=child_execution_status_stats, + child_execution_status_summary=summary, ) def get_container_execution_state( diff --git a/tests/test_execution_nodes_api_service.py b/tests/test_execution_nodes_api_service.py index d188717..90e6944 100644 --- a/tests/test_execution_nodes_api_service.py +++ b/tests/test_execution_nodes_api_service.py @@ -68,6 +68,9 @@ def test_no_children_returns_empty_stats(self): assert isinstance(result, GetGraphExecutionStateResponse) assert result.child_execution_status_stats == {} + assert result.child_execution_status_summary.total_executions == 0 + assert result.child_execution_status_summary.ended_executions == 0 + assert result.child_execution_status_summary.has_ended is True def test_children_with_no_status_are_excluded(self): """Children whose container_execution_status is None are not counted.""" @@ -87,6 +90,9 @@ def test_children_with_no_status_are_excluded(self): result = self.service.get_graph_execution_state(session, parent.id) assert result.child_execution_status_stats == {} + assert result.child_execution_status_summary.total_executions == 0 + assert result.child_execution_status_summary.ended_executions == 0 + assert result.child_execution_status_summary.has_ended is True def test_direct_container_children(self): """Children that are direct container nodes (no descendants via ancestor links).""" @@ -117,6 +123,9 @@ def test_direct_container_children(self): assert stats[child1.id] == {"SUCCEEDED": 1} assert child2.id in stats assert stats[child2.id] == {"RUNNING": 1} + assert result.child_execution_status_summary.total_executions == 2 + assert result.child_execution_status_summary.ended_executions == 1 + assert result.child_execution_status_summary.has_ended is False def test_three_level_mixed_stats(self): """3-level deep graph with direct tasks and nested sub-graphs. @@ -302,6 +311,19 @@ def test_three_level_mixed_stats(self): # appear as a key in the stats assert sub_graph_a_b.id not in stats + # -- Summary: total_executions and ended_executions -- + # Direct children (Query 2): task_1(1), task_2(1), task_3(1) = 3 nodes + # Descendants of sub_graph_a (Query 1): 6 nodes + # CANCELLED(1), SKIPPED(1), RUNNING(1), INVALID(1), SYSTEM_ERROR(1), SUCCEEDED(1) + # Total = 3 + 6 = 9 + assert result.child_execution_status_summary.total_executions == 9 + # Ended statuses: SUCCEEDED(task_1) + FAILED(task_2) = 2 from Query 2 + # + CANCELLED(1) + SKIPPED(1) + INVALID(1) + SYSTEM_ERROR(1) + SUCCEEDED(1) = 5 from Query 1 + # QUEUED(task_3) and RUNNING(task_sg_a_3) are NOT ended + # Total ended = 2 + 5 = 7 + assert result.child_execution_status_summary.ended_executions == 7 + assert result.child_execution_status_summary.has_ended is False + if __name__ == "__main__": pytest.main()