diff --git a/src/crawlee/request_loaders/_request_manager_tandem.py b/src/crawlee/request_loaders/_request_manager_tandem.py index 4f33d41cf3..3798e2e49a 100644 --- a/src/crawlee/request_loaders/_request_manager_tandem.py +++ b/src/crawlee/request_loaders/_request_manager_tandem.py @@ -89,6 +89,9 @@ async def fetch_next_request(self) -> Request | None: 'Adding request from the RequestLoader to the RequestManager failed, the request has been dropped', extra={'url': request.url, 'unique_key': request.unique_key}, ) + # Mark it as processed so that the `request` doesn't get stuck in the `in_progress` status + # in `RequestLoader` + await self._read_only_loader.mark_request_as_handled(request) return None await self._read_only_loader.mark_request_as_handled(request) diff --git a/src/crawlee/request_loaders/_sitemap_request_loader.py b/src/crawlee/request_loaders/_sitemap_request_loader.py index 06f2c29111..e1c4c4d2e1 100644 --- a/src/crawlee/request_loaders/_sitemap_request_loader.py +++ b/src/crawlee/request_loaders/_sitemap_request_loader.py @@ -160,7 +160,11 @@ def __init__( async def _get_state(self) -> SitemapRequestLoaderState: """Initialize and return the current state.""" + if self._state.is_initialized: + return self._state.current_value + async with self._queue_lock: + # Re-check if state got initialized while waiting for the lock if self._state.is_initialized: return self._state.current_value @@ -260,7 +264,6 @@ async def _load_sitemaps(self) -> None: # Check if we have capacity in the queue await self._queue_has_capacity.wait() - state = await self._get_state() async with self._queue_lock: state.url_queue.append(url) state.current_sitemap_processed_urls.add(url) @@ -318,8 +321,16 @@ async def fetch_next_request(self) -> Request | None: continue async with self._queue_lock: + # Double-check if the queue is still not empty after acquiring the lock + if not state.url_queue: + continue + url = state.url_queue.popleft() request_option = RequestOptions(url=url) + + if len(state.url_queue) < self._max_buffer_size: + self._queue_has_capacity.set() + if self._transform_request_function: transform_request_option = self._transform_request_function(request_option) if transform_request_option == 'skip': @@ -327,10 +338,9 @@ async def fetch_next_request(self) -> Request | None: continue if transform_request_option != 'unchanged': request_option = transform_request_option + request = Request.from_url(**request_option) state.in_progress.add(request.url) - if len(state.url_queue) < self._max_buffer_size: - self._queue_has_capacity.set() return request diff --git a/tests/unit/request_loaders/test_sitemap_request_loader.py b/tests/unit/request_loaders/test_sitemap_request_loader.py index a2e765ac36..8033211157 100644 --- a/tests/unit/request_loaders/test_sitemap_request_loader.py +++ b/tests/unit/request_loaders/test_sitemap_request_loader.py @@ -2,6 +2,7 @@ import base64 import gzip from typing import TYPE_CHECKING +from unittest.mock import patch from yarl import URL @@ -216,3 +217,79 @@ def transform_request(request_options: RequestOptions) -> RequestOptions | Reque 'http://not-exists.com/catalog?item=74&desc=vacation_newfoundland', 'http://not-exists.com/catalog?item=83&desc=vacation_usa', } + + +async def test_transform_request_function_with_skip(server_url: URL, http_client: HttpClient) -> None: + sitemap_url = (server_url / 'sitemap.xml').with_query(base64=encode_base64(BASIC_SITEMAP.encode())) + + def transform_request(_request_options: RequestOptions) -> RequestOptions | RequestTransformAction: + return 'skip' + + sitemap_loader = SitemapRequestLoader( + [str(sitemap_url)], + http_client=http_client, + transform_request_function=transform_request, + ) + + while not await sitemap_loader.is_finished(): + request = await sitemap_loader.fetch_next_request() + + if request: + await sitemap_loader.mark_request_as_handled(request) + + # Even though the sitemap had URLs, all were skipped, so the loader should be empty and finished with + # 0 handled requests. + assert await sitemap_loader.is_empty() + assert await sitemap_loader.is_finished() + assert await sitemap_loader.get_total_count() == 0 + assert await sitemap_loader.get_handled_count() == 0 + + +async def test_sitemap_loader_to_tandem( + server_url: URL, + http_client: HttpClient, +) -> None: + sitemap_url = (server_url / 'sitemap.xml').with_query(base64=encode_base64(BASIC_SITEMAP.encode())) + + sitemap_loader = SitemapRequestLoader([str(sitemap_url)], http_client=http_client) + request_manager = await sitemap_loader.to_tandem() + + while not await sitemap_loader.is_finished(): + request = await request_manager.fetch_next_request() + + if request: + await request_manager.mark_request_as_handled(request) + + assert await sitemap_loader.is_empty() + assert await sitemap_loader.is_finished() + + assert await request_manager.is_empty() + assert await request_manager.is_finished() + + +async def test_sitemap_loader_to_tandem_with_request_dropped( + server_url: URL, + http_client: HttpClient, +) -> None: + sitemap_url = (server_url / 'sitemap.xml').with_query(base64=encode_base64(BASIC_SITEMAP.encode())) + + sitemap_loader = SitemapRequestLoader( + [str(sitemap_url)], + http_client=http_client, + ) + request_manager = await sitemap_loader.to_tandem() + + with patch.object( + request_manager._read_write_manager, 'add_request', side_effect=Exception('Failed to add request') + ): + while not await sitemap_loader.is_finished(): + request = await request_manager.fetch_next_request() + + if request: + await request_manager.mark_request_as_handled(request) + + assert await sitemap_loader.is_empty() + assert await sitemap_loader.is_finished() + + assert await request_manager.is_empty() + assert await request_manager.is_finished()