From 1de6921a3dc9c1d6a0e14890f18845a433684b3d Mon Sep 17 00:00:00 2001 From: Zhao Xintong Date: Mon, 1 Jun 2026 15:35:26 +0800 Subject: [PATCH 1/6] add in-process URL pool caching --- lightllm/utils/envs_utils.py | 5 ++ lightllm/utils/multimodal_utils.py | 108 ++++++++++++++++++++++++----- 2 files changed, 94 insertions(+), 19 deletions(-) diff --git a/lightllm/utils/envs_utils.py b/lightllm/utils/envs_utils.py index 350507e897..1de1e79996 100644 --- a/lightllm/utils/envs_utils.py +++ b/lightllm/utils/envs_utils.py @@ -228,3 +228,8 @@ def get_added_mtp_kv_layer_num() -> int: @lru_cache(maxsize=None) def get_pd_split_max_new_tokens() -> int: return int(os.getenv("LIGHTLLM_PD_SPLIT_MAX_NEW_TOKENS", 2048)) + + +@lru_cache(maxsize=None) +def get_lightllm_url_pool_maxsize() -> int: + return int(os.getenv("LIGHTLLM_URL_POOL_MAXSIZE", 256)) diff --git a/lightllm/utils/multimodal_utils.py b/lightllm/utils/multimodal_utils.py index 876283b931..31214f6da4 100644 --- a/lightllm/utils/multimodal_utils.py +++ b/lightllm/utils/multimodal_utils.py @@ -6,11 +6,75 @@ from io import BytesIO from fastapi import Request from functools import lru_cache +from collections import OrderedDict +from typing import Awaitable, Callable, Dict, Optional, Tuple +import asyncio from lightllm.utils.error_utils import ClientDisconnected +from lightllm.utils.envs_utils import get_lightllm_url_pool_maxsize from lightllm.utils.log_utils import init_logger logger = init_logger(__name__) +class UrlResourcePool: + def __init__(self, maxsize: int = 256): + self._maxsize = maxsize + self._cache: "OrderedDict[Tuple[str, Optional[str]], bytes]" = OrderedDict() + self._inflight: Dict[Tuple[str, Optional[str]], asyncio.Future] = {} + self._lock = asyncio.Lock() + + @staticmethod + def _normalize_url(url: str) -> str: + return url.strip() + + async def get_or_create(self, url: str, proxy: Optional[str], loader: Callable[[], Awaitable[bytes]]) -> bytes: + key = (self._normalize_url(url), proxy) + + async with self._lock: + cached = self._cache.get(key) + if cached is not None: + self._cache.move_to_end(key) + logger.info(f"url_pool hit") + return cached + + inflight = self._inflight.get(key) + if inflight is None: + inflight = asyncio.get_running_loop().create_future() + self._inflight[key] = inflight + creator = True + else: + creator = False + + if not creator: + return await inflight + + try: + content = await loader() + except BaseException as exc: + async with self._lock: + current = self._inflight.get(key) + if current is inflight: + self._inflight.pop(key, None) + if not inflight.done(): + inflight.set_exception(exc) + inflight.exception() + raise + + async with self._lock: + self._cache[key] = content + self._cache.move_to_end(key) + while len(self._cache) > self._maxsize: + self._cache.popitem(last=False) + + current = self._inflight.get(key) + if current is inflight: + self._inflight.pop(key, None) + if not inflight.done(): + inflight.set_result(content) + + return content + +URL_RESOURCE_POOL = UrlResourcePool(maxsize=get_lightllm_url_pool_maxsize()) + def _httpx_async_client_proxy_kwargs(proxy) -> dict: """ @@ -45,23 +109,29 @@ def _get_xhttp_client(proxy=None): async def fetch_resource(url, request: Request, timeout, proxy=None): - logger.info(f"Begin to download resource from url: {url}") + if request is not None and await request.is_disconnected(): + raise ClientDisconnected(reason=f"client disconnected during url download") + start_time = time.time() - client = _get_xhttp_client(proxy) - async with client.stream("GET", url, timeout=timeout) as response: - response.raise_for_status() - ans_bytes = [] - async for chunk in response.aiter_bytes(chunk_size=1024 * 1024): - if request is not None and await request.is_disconnected(): - await response.aclose() - raise ClientDisconnected(reason=f"client disconnected during download of {url}") - ans_bytes.append(chunk) - # 接收的数据不能大于128M - if len(ans_bytes) > 128: - raise Exception(f"url {url} recv data is too big") - - content = b"".join(ans_bytes) - end_time = time.time() - cost_time = end_time - start_time - logger.info(f"Download url {url} resource cost time: {cost_time} seconds") - return content + + async def _load() -> bytes: + client = _get_xhttp_client(proxy) + async with client.stream("GET", url, timeout=timeout) as response: + response.raise_for_status() + ans_bytes = [] + async for chunk in response.aiter_bytes(chunk_size=1024 * 1024): + if request is not None and await request.is_disconnected(): + await response.aclose() + raise ClientDisconnected(reason=f"client disconnected during url download") + ans_bytes.append(chunk) + # 接收的数据不能大于128M + if len(ans_bytes) > 128: + raise Exception("url data is too big") + + content = b"".join(ans_bytes) + end_time = time.time() + cost_time = end_time - start_time + logger.info(f"url download done, cost={cost_time:.3f}s") + return content + + return await URL_RESOURCE_POOL.get_or_create(url, proxy, _load) From eb03373a77fe3493a43ee524f22b25e44353c066 Mon Sep 17 00:00:00 2001 From: Zhao Xintong Date: Mon, 1 Jun 2026 15:49:36 +0800 Subject: [PATCH 2/6] fix format --- lightllm/utils/multimodal_utils.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/lightllm/utils/multimodal_utils.py b/lightllm/utils/multimodal_utils.py index 31214f6da4..3970decc6e 100644 --- a/lightllm/utils/multimodal_utils.py +++ b/lightllm/utils/multimodal_utils.py @@ -15,6 +15,7 @@ logger = init_logger(__name__) + class UrlResourcePool: def __init__(self, maxsize: int = 256): self._maxsize = maxsize @@ -26,7 +27,9 @@ def __init__(self, maxsize: int = 256): def _normalize_url(url: str) -> str: return url.strip() - async def get_or_create(self, url: str, proxy: Optional[str], loader: Callable[[], Awaitable[bytes]]) -> bytes: + async def get_or_create( + self, url: str, proxy: Optional[str], loader: Callable[[], Awaitable[bytes]] + ) -> bytes: key = (self._normalize_url(url), proxy) async with self._lock: @@ -73,6 +76,7 @@ async def get_or_create(self, url: str, proxy: Optional[str], loader: Callable[[ return content + URL_RESOURCE_POOL = UrlResourcePool(maxsize=get_lightllm_url_pool_maxsize()) @@ -122,7 +126,9 @@ async def _load() -> bytes: async for chunk in response.aiter_bytes(chunk_size=1024 * 1024): if request is not None and await request.is_disconnected(): await response.aclose() - raise ClientDisconnected(reason=f"client disconnected during url download") + raise ClientDisconnected( + reason=f"client disconnected during url download" + ) ans_bytes.append(chunk) # 接收的数据不能大于128M if len(ans_bytes) > 128: From 6b3a48ab9e3bdba09c8476a160ad0466b07b858a Mon Sep 17 00:00:00 2001 From: Zhao Xintong Date: Mon, 1 Jun 2026 16:16:24 +0800 Subject: [PATCH 3/6] fix --- lightllm/utils/multimodal_utils.py | 67 +++++++++++++----------------- 1 file changed, 28 insertions(+), 39 deletions(-) diff --git a/lightllm/utils/multimodal_utils.py b/lightllm/utils/multimodal_utils.py index 3970decc6e..6e6999692e 100644 --- a/lightllm/utils/multimodal_utils.py +++ b/lightllm/utils/multimodal_utils.py @@ -20,7 +20,7 @@ class UrlResourcePool: def __init__(self, maxsize: int = 256): self._maxsize = maxsize self._cache: "OrderedDict[Tuple[str, Optional[str]], bytes]" = OrderedDict() - self._inflight: Dict[Tuple[str, Optional[str]], asyncio.Future] = {} + self._inflight: Dict[Tuple[str, Optional[str]], asyncio.Task] = {} self._lock = asyncio.Lock() @staticmethod @@ -39,42 +39,36 @@ async def get_or_create( logger.info(f"url_pool hit") return cached - inflight = self._inflight.get(key) - if inflight is None: - inflight = asyncio.get_running_loop().create_future() - self._inflight[key] = inflight - creator = True - else: - creator = False - - if not creator: - return await inflight - - try: - content = await loader() - except BaseException as exc: - async with self._lock: - current = self._inflight.get(key) - if current is inflight: - self._inflight.pop(key, None) - if not inflight.done(): - inflight.set_exception(exc) - inflight.exception() - raise + task = self._inflight.get(key) + if task is None: - async with self._lock: - self._cache[key] = content - self._cache.move_to_end(key) - while len(self._cache) > self._maxsize: - self._cache.popitem(last=False) + async def _run_and_cache() -> bytes: + try: + content = await loader() + async with self._lock: + self._cache[key] = content + self._cache.move_to_end(key) + while len(self._cache) > self._maxsize: + self._cache.popitem(last=False) + return content + finally: + async with self._lock: + self._inflight.pop(key, None) - current = self._inflight.get(key) - if current is inflight: - self._inflight.pop(key, None) - if not inflight.done(): - inflight.set_result(content) + task = asyncio.create_task(_run_and_cache()) - return content + def _consume_task_exception(completed_task: asyncio.Task) -> None: + if completed_task.cancelled(): + return + try: + completed_task.exception() + except BaseException: + return + + task.add_done_callback(_consume_task_exception) + self._inflight[key] = task + + return await asyncio.shield(task) URL_RESOURCE_POOL = UrlResourcePool(maxsize=get_lightllm_url_pool_maxsize()) @@ -124,11 +118,6 @@ async def _load() -> bytes: response.raise_for_status() ans_bytes = [] async for chunk in response.aiter_bytes(chunk_size=1024 * 1024): - if request is not None and await request.is_disconnected(): - await response.aclose() - raise ClientDisconnected( - reason=f"client disconnected during url download" - ) ans_bytes.append(chunk) # 接收的数据不能大于128M if len(ans_bytes) > 128: From 86c9bc106a1e1bf18d6a6d23e24aaed32c47b1b3 Mon Sep 17 00:00:00 2001 From: Zhao Xintong Date: Mon, 1 Jun 2026 16:27:13 +0800 Subject: [PATCH 4/6] fix --- lightllm/utils/multimodal_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/lightllm/utils/multimodal_utils.py b/lightllm/utils/multimodal_utils.py index 6e6999692e..525a6641f6 100644 --- a/lightllm/utils/multimodal_utils.py +++ b/lightllm/utils/multimodal_utils.py @@ -107,6 +107,7 @@ def _get_xhttp_client(proxy=None): async def fetch_resource(url, request: Request, timeout, proxy=None): + logger.info(f"Begin to download resource from url: {url}") if request is not None and await request.is_disconnected(): raise ClientDisconnected(reason=f"client disconnected during url download") From cf2c1296897db9c656f08a39ac85fb5b7f18d945 Mon Sep 17 00:00:00 2001 From: Zhao Xintong Date: Mon, 1 Jun 2026 16:41:46 +0800 Subject: [PATCH 5/6] fix --- lightllm/utils/multimodal_utils.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lightllm/utils/multimodal_utils.py b/lightllm/utils/multimodal_utils.py index 525a6641f6..533b21dd44 100644 --- a/lightllm/utils/multimodal_utils.py +++ b/lightllm/utils/multimodal_utils.py @@ -27,9 +27,7 @@ def __init__(self, maxsize: int = 256): def _normalize_url(url: str) -> str: return url.strip() - async def get_or_create( - self, url: str, proxy: Optional[str], loader: Callable[[], Awaitable[bytes]] - ) -> bytes: + async def get_or_create(self, url: str, proxy: Optional[str], loader: Callable[[], Awaitable[bytes]]) -> bytes: key = (self._normalize_url(url), proxy) async with self._lock: From 9e3966c077aa5363bcfd3f43e84125fc2245eabf Mon Sep 17 00:00:00 2001 From: Zhao Xintong Date: Mon, 1 Jun 2026 17:20:16 +0800 Subject: [PATCH 6/6] fix --- lightllm/utils/multimodal_utils.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/lightllm/utils/multimodal_utils.py b/lightllm/utils/multimodal_utils.py index 533b21dd44..eaf598aefc 100644 --- a/lightllm/utils/multimodal_utils.py +++ b/lightllm/utils/multimodal_utils.py @@ -56,12 +56,8 @@ async def _run_and_cache() -> bytes: task = asyncio.create_task(_run_and_cache()) def _consume_task_exception(completed_task: asyncio.Task) -> None: - if completed_task.cancelled(): - return - try: + if not completed_task.cancelled(): completed_task.exception() - except BaseException: - return task.add_done_callback(_consume_task_exception) self._inflight[key] = task