Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 36 additions & 31 deletions src/apify/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1206,44 +1206,49 @@ async def reboot(
self.log.debug('Actor is already rebooting, skipping the additional reboot call.')
return

self._is_rebooting = True
if not self.configuration.actor_run_id:
raise RuntimeError('actor_run_id cannot be None when running on the Apify platform.')

if not custom_after_sleep:
if custom_after_sleep is None:
custom_after_sleep = self.configuration.metamorph_after_sleep

# Call all the listeners for the PERSIST_STATE and MIGRATING events, and wait for them to finish.
# PERSIST_STATE listeners are called to allow the Actor to persist its state before the reboot.
# MIGRATING listeners are called to allow the Actor to gracefully stop in-progress tasks before the reboot.
# Typically, crawlers are listening for the MIIGRATING event to stop processing new requests.
# We can't just emit the events and wait for all listeners to finish,
# because this method might be called from an event listener itself, and we would deadlock.
persist_state_listeners = flatten(
(self.event_manager._listeners_to_wrappers[Event.PERSIST_STATE] or {}).values() # noqa: SLF001
)
migrating_listeners = flatten(
(self.event_manager._listeners_to_wrappers[Event.MIGRATING] or {}).values() # noqa: SLF001
)

async def safe_dispatch(listener: Any, data: Any) -> None:
try:
await listener(data)
except Exception:
self.log.exception('A pre-reboot event listener failed')
self._is_rebooting = True

timeout = event_listeners_timeout.total_seconds() if event_listeners_timeout else None
try:
async with asyncio.timeout(timeout), asyncio.TaskGroup() as tg:
for listener in persist_state_listeners:
tg.create_task(safe_dispatch(listener, EventPersistStateData(is_migrating=True)))
for listener in migrating_listeners:
tg.create_task(safe_dispatch(listener, EventMigratingData()))
except TimeoutError:
self.log.warning('Pre-reboot event listeners did not finish within timeout; proceeding with reboot')
# Call all the listeners for the PERSIST_STATE and MIGRATING events, and wait for them to finish.
# PERSIST_STATE listeners are called to allow the Actor to persist its state before the reboot.
# MIGRATING listeners are called to allow the Actor to gracefully stop in-progress tasks before
# the reboot. Typically, crawlers are listening for the MIIGRATING event to stop processing new requests.
# We can't just emit the events and wait for all listeners to finish,
# because this method might be called from an event listener itself, and we would deadlock.
persist_state_listeners = flatten(
(self.event_manager._listeners_to_wrappers[Event.PERSIST_STATE] or {}).values() # noqa: SLF001
)
migrating_listeners = flatten(
(self.event_manager._listeners_to_wrappers[Event.MIGRATING] or {}).values() # noqa: SLF001
)

if not self.configuration.actor_run_id:
raise RuntimeError('actor_run_id cannot be None when running on the Apify platform.')
async def safe_dispatch(listener: Any, data: Any) -> None:
try:
await listener(data)
except Exception:
self.log.exception('A pre-reboot event listener failed')

await self.apify_client.run(self.configuration.actor_run_id).reboot()
timeout = event_listeners_timeout.total_seconds() if event_listeners_timeout else None
try:
async with asyncio.timeout(timeout), asyncio.TaskGroup() as tg:
for listener in persist_state_listeners:
tg.create_task(safe_dispatch(listener, EventPersistStateData(is_migrating=True)))
for listener in migrating_listeners:
tg.create_task(safe_dispatch(listener, EventMigratingData()))
except TimeoutError:
self.log.warning('Pre-reboot event listeners did not finish within timeout; proceeding with reboot')

await self.apify_client.run(self.configuration.actor_run_id).reboot()
except BaseException:
# Reset the flag so that a failed or cancelled reboot can be retried.
self._is_rebooting = False
raise

if custom_after_sleep:
await asyncio.sleep(custom_after_sleep.total_seconds())
Expand Down
59 changes: 59 additions & 0 deletions tests/unit/actor/test_actor_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,3 +420,62 @@ async def hanging_listener(*_args: object) -> None:

# The reboot API call proceeded despite the hanging listener.
assert len(apify_client_async_patcher.calls['run']['reboot']) == 1


async def test_reboot_can_be_retried_after_failed_attempt(
apify_client_async_patcher: ApifyClientAsyncPatcher,
) -> None:
"""Test that a failed reboot API call does not permanently disable subsequent reboot attempts."""
attempts = 0

async def reboot_failing_once(*_args: object, **_kwargs: object) -> None:
nonlocal attempts
attempts += 1
if attempts == 1:
raise RuntimeError('Reboot API error')

apify_client_async_patcher.patch('run', 'reboot', replacement_method=reboot_failing_once)

async with Actor:
Actor.configuration.is_at_home = True
Actor.configuration.actor_run_id = 'some-run-id'

with pytest.raises(RuntimeError, match='Reboot API error'):
await Actor.reboot(custom_after_sleep=timedelta(milliseconds=1))

# The retry must reach the API again instead of being silently skipped.
await Actor.reboot(custom_after_sleep=timedelta(milliseconds=1))

assert len(apify_client_async_patcher.calls['run']['reboot']) == 2


async def test_reboot_can_be_retried_after_cancelled_attempt(
apify_client_async_patcher: ApifyClientAsyncPatcher,
) -> None:
"""Test that a cancelled reboot attempt does not permanently disable subsequent reboot attempts."""
attempts = 0
first_attempt_started = asyncio.Event()

async def reboot_hanging_once(*_args: object, **_kwargs: object) -> None:
nonlocal attempts
attempts += 1
if attempts == 1:
first_attempt_started.set()
await asyncio.sleep(60)

apify_client_async_patcher.patch('run', 'reboot', replacement_method=reboot_hanging_once)

async with Actor:
Actor.configuration.is_at_home = True
Actor.configuration.actor_run_id = 'some-run-id'

reboot_task = asyncio.create_task(Actor.reboot(custom_after_sleep=timedelta(milliseconds=1)))
await first_attempt_started.wait()
reboot_task.cancel()
with pytest.raises(asyncio.CancelledError):
await reboot_task

# The retry must reach the API again instead of being silently skipped.
await Actor.reboot(custom_after_sleep=timedelta(milliseconds=1))

assert len(apify_client_async_patcher.calls['run']['reboot']) == 2
Loading