diff --git a/src/apify/_actor.py b/src/apify/_actor.py index befd051d..2aef76d0 100644 --- a/src/apify/_actor.py +++ b/src/apify/_actor.py @@ -197,7 +197,12 @@ async def __aenter__(self) -> Self: self.log.debug('Event manager initialized') # Initialize the charging manager. - await self._charging_manager_implementation.__aenter__() + try: + await self._charging_manager_implementation.__aenter__() + except BaseException: + # Exit the already-entered event manager so its recurring tasks do not leak. + await self.event_manager.__aexit__(None, None, None) + raise self.log.debug('Charging manager initialized') # Mark initialization as complete and update global state. diff --git a/src/apify/events/_apify_event_manager.py b/src/apify/events/_apify_event_manager.py index 21707875..ca51618a 100644 --- a/src/apify/events/_apify_event_manager.py +++ b/src/apify/events/_apify_event_manager.py @@ -78,6 +78,8 @@ async def __aenter__(self) -> Self: ) is_connected = await self._connected_to_platform_websocket if not is_connected: + # Exit the already-entered parent so the recurring persist state task does not leak. + await self.__aexit__(None, None, None) raise RuntimeError('Error connecting to platform events websocket!') else: logger.debug('APIFY_ACTOR_EVENTS_WS_URL env var not set, no events from Apify platform will be emitted.') diff --git a/tests/unit/actor/test_actor_lifecycle.py b/tests/unit/actor/test_actor_lifecycle.py index a57de66a..0575a211 100644 --- a/tests/unit/actor/test_actor_lifecycle.py +++ b/tests/unit/actor/test_actor_lifecycle.py @@ -18,6 +18,7 @@ from ..._utils import poll_until_condition from apify import Actor +from apify._charging import ChargingManagerImplementation from apify._consts import EXIT_CODE_ERROR_USER_FUNCTION_THREW, ActorEnvVars, ApifyEnvVars if TYPE_CHECKING: @@ -112,6 +113,19 @@ async def test_fail_properly_deinitializes_actor(actor: _ActorType) -> None: assert actor._active is False +async def test_failed_charging_manager_init_does_not_leak_event_manager() -> None: + """Test that a failure in the charging manager's `__aenter__` also exits the already-entered event manager.""" + actor = Actor() + with ( + mock.patch.object(ChargingManagerImplementation, '__aenter__', side_effect=RuntimeError('Charging failed')), + pytest.raises(RuntimeError, match='Charging failed'), + ): + await actor.init() + + assert actor._active is False + assert actor.event_manager.active is False + + async def test_actor_handles_exceptions_and_cleans_up_properly() -> None: """Test that Actor properly cleans up when an exception occurs in the async context manager.""" actor = None diff --git a/tests/unit/events/test_apify_event_manager.py b/tests/unit/events/test_apify_event_manager.py index 13568f43..21ed00bd 100644 --- a/tests/unit/events/test_apify_event_manager.py +++ b/tests/unit/events/test_apify_event_manager.py @@ -162,6 +162,7 @@ async def event_handler(data: Any) -> None: async def test_lifecycle_on_platform_without_websocket(monkeypatch: pytest.MonkeyPatch) -> None: + """Test that a failed websocket connection raises and also exits the parent's recurring persist state task.""" monkeypatch.setenv(ActorEnvVars.EVENTS_WEBSOCKET_URL, 'ws://localhost:56565') event_manager = ApifyEventManager(Configuration.get_global_configuration()) @@ -169,6 +170,10 @@ async def test_lifecycle_on_platform_without_websocket(monkeypatch: pytest.Monke async with event_manager: pass + assert event_manager.active is False + persist_state_task = event_manager._emit_persist_state_event_rec_task.task + assert persist_state_task is None or persist_state_task.done() + async def test_lifecycle_on_platform(monkeypatch: pytest.MonkeyPatch) -> None: async with (