Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/crawlee/request_loaders/_request_manager_tandem.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ async def fetch_next_request(self) -> Request | None:
'Adding request from the RequestLoader to the RequestManager failed, the request has been dropped',
extra={'url': request.url, 'unique_key': request.unique_key},
)
# Mark it as processed so that the `request` doesn't get stuck in the `in_progress` status
# in `RequestLoader`
await self._read_only_loader.mark_request_as_handled(request)
return None

await self._read_only_loader.mark_request_as_handled(request)
Expand Down
16 changes: 13 additions & 3 deletions src/crawlee/request_loaders/_sitemap_request_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,11 @@ def __init__(

async def _get_state(self) -> SitemapRequestLoaderState:
"""Initialize and return the current state."""
if self._state.is_initialized:
return self._state.current_value

async with self._queue_lock:
# Re-check if state got initialized while waiting for the lock
if self._state.is_initialized:
return self._state.current_value

Expand Down Expand Up @@ -260,7 +264,6 @@ async def _load_sitemaps(self) -> None:
# Check if we have capacity in the queue
await self._queue_has_capacity.wait()

state = await self._get_state()
async with self._queue_lock:
state.url_queue.append(url)
state.current_sitemap_processed_urls.add(url)
Expand Down Expand Up @@ -318,19 +321,26 @@ async def fetch_next_request(self) -> Request | None:
continue

async with self._queue_lock:
# Double-check if the queue is still not empty after acquiring the lock
if not state.url_queue:
continue

url = state.url_queue.popleft()
request_option = RequestOptions(url=url)

if len(state.url_queue) < self._max_buffer_size:
self._queue_has_capacity.set()

if self._transform_request_function:
transform_request_option = self._transform_request_function(request_option)
if transform_request_option == 'skip':
state.total_count -= 1
continue
if transform_request_option != 'unchanged':
request_option = transform_request_option

request = Request.from_url(**request_option)
state.in_progress.add(request.url)
if len(state.url_queue) < self._max_buffer_size:
self._queue_has_capacity.set()

return request

Expand Down
77 changes: 77 additions & 0 deletions tests/unit/request_loaders/test_sitemap_request_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import base64
import gzip
from typing import TYPE_CHECKING
from unittest.mock import patch

from yarl import URL

Expand Down Expand Up @@ -216,3 +217,79 @@ def transform_request(request_options: RequestOptions) -> RequestOptions | Reque
'http://not-exists.com/catalog?item=74&desc=vacation_newfoundland',
'http://not-exists.com/catalog?item=83&desc=vacation_usa',
}


async def test_transform_request_function_with_skip(server_url: URL, http_client: HttpClient) -> None:
sitemap_url = (server_url / 'sitemap.xml').with_query(base64=encode_base64(BASIC_SITEMAP.encode()))

def transform_request(_request_options: RequestOptions) -> RequestOptions | RequestTransformAction:
return 'skip'

sitemap_loader = SitemapRequestLoader(
[str(sitemap_url)],
http_client=http_client,
transform_request_function=transform_request,
)

while not await sitemap_loader.is_finished():
request = await sitemap_loader.fetch_next_request()

if request:
await sitemap_loader.mark_request_as_handled(request)

# Even though the sitemap had URLs, all were skipped, so the loader should be empty and finished with
# 0 handled requests.
assert await sitemap_loader.is_empty()
assert await sitemap_loader.is_finished()
assert await sitemap_loader.get_total_count() == 0
assert await sitemap_loader.get_handled_count() == 0


async def test_sitemap_loader_to_tandem(
server_url: URL,
http_client: HttpClient,
) -> None:
sitemap_url = (server_url / 'sitemap.xml').with_query(base64=encode_base64(BASIC_SITEMAP.encode()))

sitemap_loader = SitemapRequestLoader([str(sitemap_url)], http_client=http_client)
request_manager = await sitemap_loader.to_tandem()

while not await sitemap_loader.is_finished():
request = await request_manager.fetch_next_request()

if request:
await request_manager.mark_request_as_handled(request)

assert await sitemap_loader.is_empty()
assert await sitemap_loader.is_finished()

assert await request_manager.is_empty()
assert await request_manager.is_finished()


async def test_sitemap_loader_to_tandem_with_request_dropped(
server_url: URL,
http_client: HttpClient,
) -> None:
sitemap_url = (server_url / 'sitemap.xml').with_query(base64=encode_base64(BASIC_SITEMAP.encode()))

sitemap_loader = SitemapRequestLoader(
[str(sitemap_url)],
http_client=http_client,
)
request_manager = await sitemap_loader.to_tandem()

with patch.object(
request_manager._read_write_manager, 'add_request', side_effect=Exception('Failed to add request')
):
while not await sitemap_loader.is_finished():
request = await request_manager.fetch_next_request()

if request:
await request_manager.mark_request_as_handled(request)

assert await sitemap_loader.is_empty()
assert await sitemap_loader.is_finished()

assert await request_manager.is_empty()
assert await request_manager.is_finished()
Loading