Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/apify/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,12 @@ async def __aenter__(self) -> Self:
self.log.debug('Event manager initialized')

# Initialize the charging manager.
await self._charging_manager_implementation.__aenter__()
try:
await self._charging_manager_implementation.__aenter__()
except BaseException:
# Exit the already-entered event manager so its recurring tasks do not leak.
await self.event_manager.__aexit__(None, None, None)
raise
self.log.debug('Charging manager initialized')

# Mark initialization as complete and update global state.
Expand Down
2 changes: 2 additions & 0 deletions src/apify/events/_apify_event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ async def __aenter__(self) -> Self:
)
is_connected = await self._connected_to_platform_websocket
if not is_connected:
# Exit the already-entered parent so the recurring persist state task does not leak.
await self.__aexit__(None, None, None)
raise RuntimeError('Error connecting to platform events websocket!')
else:
logger.debug('APIFY_ACTOR_EVENTS_WS_URL env var not set, no events from Apify platform will be emitted.')
Expand Down
14 changes: 14 additions & 0 deletions tests/unit/actor/test_actor_lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from ..._utils import poll_until_condition
from apify import Actor
from apify._charging import ChargingManagerImplementation
from apify._consts import EXIT_CODE_ERROR_USER_FUNCTION_THREW, ActorEnvVars, ApifyEnvVars

if TYPE_CHECKING:
Expand Down Expand Up @@ -112,6 +113,19 @@ async def test_fail_properly_deinitializes_actor(actor: _ActorType) -> None:
assert actor._active is False


async def test_failed_charging_manager_init_does_not_leak_event_manager() -> None:
"""Test that a failure in the charging manager's `__aenter__` also exits the already-entered event manager."""
actor = Actor()
with (
mock.patch.object(ChargingManagerImplementation, '__aenter__', side_effect=RuntimeError('Charging failed')),
pytest.raises(RuntimeError, match='Charging failed'),
):
await actor.init()

assert actor._active is False
assert actor.event_manager.active is False


async def test_actor_handles_exceptions_and_cleans_up_properly() -> None:
"""Test that Actor properly cleans up when an exception occurs in the async context manager."""
actor = None
Expand Down
5 changes: 5 additions & 0 deletions tests/unit/events/test_apify_event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,18 @@ async def event_handler(data: Any) -> None:


async def test_lifecycle_on_platform_without_websocket(monkeypatch: pytest.MonkeyPatch) -> None:
"""Test that a failed websocket connection raises and also exits the parent's recurring persist state task."""
monkeypatch.setenv(ActorEnvVars.EVENTS_WEBSOCKET_URL, 'ws://localhost:56565')
event_manager = ApifyEventManager(Configuration.get_global_configuration())

with pytest.raises(RuntimeError, match=r'Error connecting to platform events websocket!'):
async with event_manager:
pass

assert event_manager.active is False
persist_state_task = event_manager._emit_persist_state_event_rec_task.task
assert persist_state_task is None or persist_state_task.done()


async def test_lifecycle_on_platform(monkeypatch: pytest.MonkeyPatch) -> None:
async with (
Expand Down
Loading