From d69d0ace94e44e60e9f3b0aea27c08d63c45ddee Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Thu, 11 Jun 2026 15:49:38 +0200 Subject: [PATCH] fix: allow Actor.reboot() to be retried after a failed or cancelled attempt A failed reboot API call left the internal rebooting flag set, so every subsequent reboot() call was silently skipped for the rest of the run. --- src/apify/_actor.py | 67 ++++++++++++++------------ tests/unit/actor/test_actor_helpers.py | 59 +++++++++++++++++++++++ 2 files changed, 95 insertions(+), 31 deletions(-) diff --git a/src/apify/_actor.py b/src/apify/_actor.py index befd051d..e96b289d 100644 --- a/src/apify/_actor.py +++ b/src/apify/_actor.py @@ -1206,44 +1206,49 @@ async def reboot( self.log.debug('Actor is already rebooting, skipping the additional reboot call.') return - self._is_rebooting = True + if not self.configuration.actor_run_id: + raise RuntimeError('actor_run_id cannot be None when running on the Apify platform.') - if not custom_after_sleep: + if custom_after_sleep is None: custom_after_sleep = self.configuration.metamorph_after_sleep - # Call all the listeners for the PERSIST_STATE and MIGRATING events, and wait for them to finish. - # PERSIST_STATE listeners are called to allow the Actor to persist its state before the reboot. - # MIGRATING listeners are called to allow the Actor to gracefully stop in-progress tasks before the reboot. - # Typically, crawlers are listening for the MIIGRATING event to stop processing new requests. - # We can't just emit the events and wait for all listeners to finish, - # because this method might be called from an event listener itself, and we would deadlock. - persist_state_listeners = flatten( - (self.event_manager._listeners_to_wrappers[Event.PERSIST_STATE] or {}).values() # noqa: SLF001 - ) - migrating_listeners = flatten( - (self.event_manager._listeners_to_wrappers[Event.MIGRATING] or {}).values() # noqa: SLF001 - ) - - async def safe_dispatch(listener: Any, data: Any) -> None: - try: - await listener(data) - except Exception: - self.log.exception('A pre-reboot event listener failed') + self._is_rebooting = True - timeout = event_listeners_timeout.total_seconds() if event_listeners_timeout else None try: - async with asyncio.timeout(timeout), asyncio.TaskGroup() as tg: - for listener in persist_state_listeners: - tg.create_task(safe_dispatch(listener, EventPersistStateData(is_migrating=True))) - for listener in migrating_listeners: - tg.create_task(safe_dispatch(listener, EventMigratingData())) - except TimeoutError: - self.log.warning('Pre-reboot event listeners did not finish within timeout; proceeding with reboot') + # Call all the listeners for the PERSIST_STATE and MIGRATING events, and wait for them to finish. + # PERSIST_STATE listeners are called to allow the Actor to persist its state before the reboot. + # MIGRATING listeners are called to allow the Actor to gracefully stop in-progress tasks before + # the reboot. Typically, crawlers are listening for the MIIGRATING event to stop processing new requests. + # We can't just emit the events and wait for all listeners to finish, + # because this method might be called from an event listener itself, and we would deadlock. + persist_state_listeners = flatten( + (self.event_manager._listeners_to_wrappers[Event.PERSIST_STATE] or {}).values() # noqa: SLF001 + ) + migrating_listeners = flatten( + (self.event_manager._listeners_to_wrappers[Event.MIGRATING] or {}).values() # noqa: SLF001 + ) - if not self.configuration.actor_run_id: - raise RuntimeError('actor_run_id cannot be None when running on the Apify platform.') + async def safe_dispatch(listener: Any, data: Any) -> None: + try: + await listener(data) + except Exception: + self.log.exception('A pre-reboot event listener failed') - await self.apify_client.run(self.configuration.actor_run_id).reboot() + timeout = event_listeners_timeout.total_seconds() if event_listeners_timeout else None + try: + async with asyncio.timeout(timeout), asyncio.TaskGroup() as tg: + for listener in persist_state_listeners: + tg.create_task(safe_dispatch(listener, EventPersistStateData(is_migrating=True))) + for listener in migrating_listeners: + tg.create_task(safe_dispatch(listener, EventMigratingData())) + except TimeoutError: + self.log.warning('Pre-reboot event listeners did not finish within timeout; proceeding with reboot') + + await self.apify_client.run(self.configuration.actor_run_id).reboot() + except BaseException: + # Reset the flag so that a failed or cancelled reboot can be retried. + self._is_rebooting = False + raise if custom_after_sleep: await asyncio.sleep(custom_after_sleep.total_seconds()) diff --git a/tests/unit/actor/test_actor_helpers.py b/tests/unit/actor/test_actor_helpers.py index 633aa028..5cb76305 100644 --- a/tests/unit/actor/test_actor_helpers.py +++ b/tests/unit/actor/test_actor_helpers.py @@ -420,3 +420,62 @@ async def hanging_listener(*_args: object) -> None: # The reboot API call proceeded despite the hanging listener. assert len(apify_client_async_patcher.calls['run']['reboot']) == 1 + + +async def test_reboot_can_be_retried_after_failed_attempt( + apify_client_async_patcher: ApifyClientAsyncPatcher, +) -> None: + """Test that a failed reboot API call does not permanently disable subsequent reboot attempts.""" + attempts = 0 + + async def reboot_failing_once(*_args: object, **_kwargs: object) -> None: + nonlocal attempts + attempts += 1 + if attempts == 1: + raise RuntimeError('Reboot API error') + + apify_client_async_patcher.patch('run', 'reboot', replacement_method=reboot_failing_once) + + async with Actor: + Actor.configuration.is_at_home = True + Actor.configuration.actor_run_id = 'some-run-id' + + with pytest.raises(RuntimeError, match='Reboot API error'): + await Actor.reboot(custom_after_sleep=timedelta(milliseconds=1)) + + # The retry must reach the API again instead of being silently skipped. + await Actor.reboot(custom_after_sleep=timedelta(milliseconds=1)) + + assert len(apify_client_async_patcher.calls['run']['reboot']) == 2 + + +async def test_reboot_can_be_retried_after_cancelled_attempt( + apify_client_async_patcher: ApifyClientAsyncPatcher, +) -> None: + """Test that a cancelled reboot attempt does not permanently disable subsequent reboot attempts.""" + attempts = 0 + first_attempt_started = asyncio.Event() + + async def reboot_hanging_once(*_args: object, **_kwargs: object) -> None: + nonlocal attempts + attempts += 1 + if attempts == 1: + first_attempt_started.set() + await asyncio.sleep(60) + + apify_client_async_patcher.patch('run', 'reboot', replacement_method=reboot_hanging_once) + + async with Actor: + Actor.configuration.is_at_home = True + Actor.configuration.actor_run_id = 'some-run-id' + + reboot_task = asyncio.create_task(Actor.reboot(custom_after_sleep=timedelta(milliseconds=1))) + await first_attempt_started.wait() + reboot_task.cancel() + with pytest.raises(asyncio.CancelledError): + await reboot_task + + # The retry must reach the API again instead of being silently skipped. + await Actor.reboot(custom_after_sleep=timedelta(milliseconds=1)) + + assert len(apify_client_async_patcher.calls['run']['reboot']) == 2