From 5c1ba6fb6033847dd6947bc713e57ebee53013b1 Mon Sep 17 00:00:00 2001 From: redactdeveloper <283494121+redactdeveloper@users.noreply.github.com> Date: Mon, 1 Jun 2026 06:44:12 +0300 Subject: [PATCH 1/3] fix: avoid response parse validator leak (closes #3084) --- src/openai/lib/_parsing/_responses.py | 6 +-- tests/lib/responses/test_responses.py | 62 +++++++++++++++++++++++++++ 2 files changed, 65 insertions(+), 3 deletions(-) diff --git a/src/openai/lib/_parsing/_responses.py b/src/openai/lib/_parsing/_responses.py index 8853a0749f..df48047d1c 100644 --- a/src/openai/lib/_parsing/_responses.py +++ b/src/openai/lib/_parsing/_responses.py @@ -68,7 +68,7 @@ def parse_response( content_list.append( construct_type_unchecked( - type_=ParsedResponseOutputText[TextFormatT], + type_=ParsedResponseOutputText, value={ **item.to_dict(), "parsed": parse_text(item.text, text_format=text_format), @@ -78,7 +78,7 @@ def parse_response( output_list.append( construct_type_unchecked( - type_=ParsedResponseOutputMessage[TextFormatT], + type_=ParsedResponseOutputMessage, value={ **output.to_dict(), "content": content_list, @@ -130,7 +130,7 @@ def parse_response( output_list.append(output) return construct_type_unchecked( - type_=ParsedResponse[TextFormatT], + type_=ParsedResponse, value={ **response.to_dict(), "output": output_list, diff --git a/tests/lib/responses/test_responses.py b/tests/lib/responses/test_responses.py index 8e5f16df95..0c2638be2e 100644 --- a/tests/lib/responses/test_responses.py +++ b/tests/lib/responses/test_responses.py @@ -1,19 +1,46 @@ from __future__ import annotations +import gc from typing_extensions import TypeVar import pytest from respx import MockRouter +from pydantic_core import SchemaValidator from inline_snapshot import snapshot from openai import OpenAI, AsyncOpenAI +from openai._types import omit from openai._utils import assert_signatures_in_sync +from openai._models import construct_type_unchecked +from openai.types.responses import Response +from openai.lib._parsing._responses import parse_response from ...conftest import base_url from ..snapshots import make_snapshot_request _T = TypeVar("_T") + +def _minimal_response_data(output: object, *, status: str = "completed") -> dict[str, object]: + return { + "id": "resp_test", + "object": "response", + "created_at": 1, + "status": status, + "model": "gpt-4o-mini", + "output": output, + "parallel_tool_calls": True, + "tool_choice": "auto", + "tools": [], + } + + +def _minimal_response(output: object, *, status: str = "completed") -> Response: + return construct_type_unchecked( + type_=Response, + value=_minimal_response_data(output, status=status), + ) + # all the snapshots in this file are auto-generated from the live API # # you can update them with @@ -41,6 +68,41 @@ def test_output_text(client: OpenAI, respx_mock: MockRouter) -> None: ) +def test_parse_response_does_not_leak_schema_validators() -> None: + response = _minimal_response( + [ + { + "id": "msg_test", + "type": "message", + "status": "completed", + "role": "assistant", + "content": [ + { + "type": "output_text", + "annotations": [], + "text": "hello", + } + ], + } + ] + ) + + for _ in range(100): + parse_response(text_format=omit, input_tools=omit, response=response) + + for _ in range(100): + parse_response(text_format=omit, input_tools=omit, response=response) + + gc.collect() + validator_count = sum(1 for obj in gc.get_objects() if type(obj) is SchemaValidator) + + for _ in range(100): + parse_response(text_format=omit, input_tools=omit, response=response) + + gc.collect() + assert sum(1 for obj in gc.get_objects() if type(obj) is SchemaValidator) == validator_count + + @pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"]) def test_stream_method_definition_in_sync(sync: bool, client: OpenAI, async_client: AsyncOpenAI) -> None: checking_client: OpenAI | AsyncOpenAI = client if sync else async_client From c67cba3b812780415dc27c6cd8b9ff2579404ac4 Mon Sep 17 00:00:00 2001 From: redactdeveloper <283494121+redactdeveloper@users.noreply.github.com> Date: Mon, 1 Jun 2026 06:44:17 +0300 Subject: [PATCH 2/3] fix: bound vector store polling waits (closes #3097) --- src/openai/__init__.py | 2 + src/openai/_exceptions.py | 5 + .../resources/vector_stores/file_batches.py | 36 ++++++- src/openai/resources/vector_stores/files.py | 36 ++++++- tests/lib/test_vector_store_polling.py | 97 +++++++++++++++++++ 5 files changed, 172 insertions(+), 4 deletions(-) create mode 100644 tests/lib/test_vector_store_polling.py diff --git a/src/openai/__init__.py b/src/openai/__init__.py index cbaef0615f..90a2c5c81f 100644 --- a/src/openai/__init__.py +++ b/src/openai/__init__.py @@ -27,6 +27,7 @@ APIConnectionError, AuthenticationError, InternalServerError, + PollingTimeoutError, PermissionDeniedError, LengthFinishReasonError, WebSocketQueueFullError, @@ -68,6 +69,7 @@ "UnprocessableEntityError", "RateLimitError", "InternalServerError", + "PollingTimeoutError", "LengthFinishReasonError", "ContentFilterFinishReasonError", "InvalidWebhookSignatureError", diff --git a/src/openai/_exceptions.py b/src/openai/_exceptions.py index 86f44b0e15..e70dc04eb2 100644 --- a/src/openai/_exceptions.py +++ b/src/openai/_exceptions.py @@ -27,6 +27,7 @@ "LengthFinishReasonError", "ContentFilterFinishReasonError", "InvalidWebhookSignatureError", + "PollingTimeoutError", "SubjectTokenProviderError", "WebSocketConnectionClosedError", "WebSocketQueueFullError", @@ -37,6 +38,10 @@ class OpenAIError(Exception): pass +class PollingTimeoutError(OpenAIError): + pass + + class SubjectTokenProviderError(OpenAIError): response: httpx.Response | None diff --git a/src/openai/resources/vector_stores/file_batches.py b/src/openai/resources/vector_stores/file_batches.py index 4bde1a4aa6..0d424c9439 100644 --- a/src/openai/resources/vector_stores/file_batches.py +++ b/src/openai/resources/vector_stores/file_batches.py @@ -2,6 +2,7 @@ from __future__ import annotations +import time import asyncio from typing import Dict, Iterable, Optional from typing_extensions import Union, Literal @@ -18,6 +19,7 @@ from ..._resource import SyncAPIResource, AsyncAPIResource from ..._response import to_streamed_response_wrapper, async_to_streamed_response_wrapper from ...pagination import SyncCursorPage, AsyncCursorPage +from ..._exceptions import PollingTimeoutError from ..._base_client import AsyncPaginator, make_request_options from ...types.file_object import FileObject from ...types.vector_stores import file_batch_create_params, file_batch_list_files_params @@ -223,6 +225,7 @@ def create_and_poll( file_ids: SequenceNotStr[str] | Omit = omit, files: Iterable[file_batch_create_params.File] | Omit = omit, poll_interval_ms: int | Omit = omit, + max_wait_seconds: float | None = 600, # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. # The extra values given here take precedence over values defined on the client or passed to this method. extra_headers: Headers | None = None, @@ -247,6 +250,7 @@ def create_and_poll( batch.id, vector_store_id=vector_store_id, poll_interval_ms=poll_interval_ms, + max_wait_seconds=max_wait_seconds, ) def list_files( @@ -334,6 +338,7 @@ def poll( *, vector_store_id: str, poll_interval_ms: int | Omit = omit, + max_wait_seconds: float | None = 600, ) -> VectorStoreFileBatch: """Wait for the given file batch to be processed. @@ -344,6 +349,7 @@ def poll( if is_given(poll_interval_ms): headers["X-Stainless-Custom-Poll-Interval"] = str(poll_interval_ms) + started_at = time.monotonic() while True: response = self.with_raw_response.retrieve( batch_id, @@ -353,6 +359,12 @@ def poll( batch = response.parse() if batch.file_counts.in_progress > 0: + elapsed = time.monotonic() - started_at + if max_wait_seconds is not None and elapsed >= max_wait_seconds: + raise PollingTimeoutError( + f"Polling timed out after {max_wait_seconds}s: file batch {batch_id} is still in_progress" + ) + if not is_given(poll_interval_ms): from_header = response.headers.get("openai-poll-after-ms") if from_header is not None: @@ -360,7 +372,10 @@ def poll( else: poll_interval_ms = 1000 - self._sleep(poll_interval_ms / 1000) + sleep_seconds = poll_interval_ms / 1000 + if max_wait_seconds is not None: + sleep_seconds = min(sleep_seconds, max_wait_seconds - elapsed) + self._sleep(sleep_seconds) continue return batch @@ -373,6 +388,7 @@ def upload_and_poll( max_concurrency: int = 5, file_ids: SequenceNotStr[str] = [], poll_interval_ms: int | Omit = omit, + max_wait_seconds: float | None = 600, chunking_strategy: FileChunkingStrategyParam | Omit = omit, ) -> VectorStoreFileBatch: """Uploads the given files concurrently and then creates a vector store file batch. @@ -411,6 +427,7 @@ def upload_and_poll( vector_store_id=vector_store_id, file_ids=[*file_ids, *(f.id for f in results)], poll_interval_ms=poll_interval_ms, + max_wait_seconds=max_wait_seconds, chunking_strategy=chunking_strategy, ) return batch @@ -611,6 +628,7 @@ async def create_and_poll( file_ids: SequenceNotStr[str] | Omit = omit, files: Iterable[file_batch_create_params.File] | Omit = omit, poll_interval_ms: int | Omit = omit, + max_wait_seconds: float | None = 600, # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. # The extra values given here take precedence over values defined on the client or passed to this method. extra_headers: Headers | None = None, @@ -635,6 +653,7 @@ async def create_and_poll( batch.id, vector_store_id=vector_store_id, poll_interval_ms=poll_interval_ms, + max_wait_seconds=max_wait_seconds, ) def list_files( @@ -722,6 +741,7 @@ async def poll( *, vector_store_id: str, poll_interval_ms: int | Omit = omit, + max_wait_seconds: float | None = 600, ) -> VectorStoreFileBatch: """Wait for the given file batch to be processed. @@ -732,6 +752,7 @@ async def poll( if is_given(poll_interval_ms): headers["X-Stainless-Custom-Poll-Interval"] = str(poll_interval_ms) + started_at = time.monotonic() while True: response = await self.with_raw_response.retrieve( batch_id, @@ -741,6 +762,12 @@ async def poll( batch = response.parse() if batch.file_counts.in_progress > 0: + elapsed = time.monotonic() - started_at + if max_wait_seconds is not None and elapsed >= max_wait_seconds: + raise PollingTimeoutError( + f"Polling timed out after {max_wait_seconds}s: file batch {batch_id} is still in_progress" + ) + if not is_given(poll_interval_ms): from_header = response.headers.get("openai-poll-after-ms") if from_header is not None: @@ -748,7 +775,10 @@ async def poll( else: poll_interval_ms = 1000 - await self._sleep(poll_interval_ms / 1000) + sleep_seconds = poll_interval_ms / 1000 + if max_wait_seconds is not None: + sleep_seconds = min(sleep_seconds, max_wait_seconds - elapsed) + await self._sleep(sleep_seconds) continue return batch @@ -761,6 +791,7 @@ async def upload_and_poll( max_concurrency: int = 5, file_ids: SequenceNotStr[str] = [], poll_interval_ms: int | Omit = omit, + max_wait_seconds: float | None = 600, chunking_strategy: FileChunkingStrategyParam | Omit = omit, ) -> VectorStoreFileBatch: """Uploads the given files concurrently and then creates a vector store file batch. @@ -822,6 +853,7 @@ async def trio_upload_file(limiter: trio.CapacityLimiter, file: FileTypes) -> No vector_store_id=vector_store_id, file_ids=[*file_ids, *(f.id for f in uploaded_files)], poll_interval_ms=poll_interval_ms, + max_wait_seconds=max_wait_seconds, chunking_strategy=chunking_strategy, ) return batch diff --git a/src/openai/resources/vector_stores/files.py b/src/openai/resources/vector_stores/files.py index b7e1ea9f92..304528ebf9 100644 --- a/src/openai/resources/vector_stores/files.py +++ b/src/openai/resources/vector_stores/files.py @@ -2,6 +2,7 @@ from __future__ import annotations +import time from typing import TYPE_CHECKING, Dict, Union, Optional from typing_extensions import Literal, assert_never @@ -15,6 +16,7 @@ from ..._resource import SyncAPIResource, AsyncAPIResource from ..._response import to_streamed_response_wrapper, async_to_streamed_response_wrapper from ...pagination import SyncPage, AsyncPage, SyncCursorPage, AsyncCursorPage +from ..._exceptions import PollingTimeoutError from ..._base_client import AsyncPaginator, make_request_options from ...types.vector_stores import file_list_params, file_create_params, file_update_params from ...types.file_chunking_strategy_param import FileChunkingStrategyParam @@ -331,6 +333,7 @@ def create_and_poll( vector_store_id: str, attributes: Optional[Dict[str, Union[str, float, bool]]] | Omit = omit, poll_interval_ms: int | Omit = omit, + max_wait_seconds: float | None = 600, chunking_strategy: FileChunkingStrategyParam | Omit = omit, # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. # The extra values given here take precedence over values defined on the client or passed to this method. @@ -355,6 +358,7 @@ def create_and_poll( file_id, vector_store_id=vector_store_id, poll_interval_ms=poll_interval_ms, + max_wait_seconds=max_wait_seconds, ) def poll( @@ -363,6 +367,7 @@ def poll( *, vector_store_id: str, poll_interval_ms: int | Omit = omit, + max_wait_seconds: float | None = 600, ) -> VectorStoreFile: """Wait for the vector store file to finish processing. @@ -373,6 +378,7 @@ def poll( if is_given(poll_interval_ms): headers["X-Stainless-Custom-Poll-Interval"] = str(poll_interval_ms) + started_at = time.monotonic() while True: response = self.with_raw_response.retrieve( file_id, @@ -382,6 +388,12 @@ def poll( file = response.parse() if file.status == "in_progress": + elapsed = time.monotonic() - started_at + if max_wait_seconds is not None and elapsed >= max_wait_seconds: + raise PollingTimeoutError( + f"Polling timed out after {max_wait_seconds}s: file {file_id} is still in_progress" + ) + if not is_given(poll_interval_ms): from_header = response.headers.get("openai-poll-after-ms") if from_header is not None: @@ -389,7 +401,10 @@ def poll( else: poll_interval_ms = 1000 - self._sleep(poll_interval_ms / 1000) + sleep_seconds = poll_interval_ms / 1000 + if max_wait_seconds is not None: + sleep_seconds = min(sleep_seconds, max_wait_seconds - elapsed) + self._sleep(sleep_seconds) elif file.status == "cancelled" or file.status == "completed" or file.status == "failed": return file else: @@ -420,6 +435,7 @@ def upload_and_poll( file: FileTypes, attributes: Optional[Dict[str, Union[str, float, bool]]] | Omit = omit, poll_interval_ms: int | Omit = omit, + max_wait_seconds: float | None = 600, chunking_strategy: FileChunkingStrategyParam | Omit = omit, ) -> VectorStoreFile: """Add a file to a vector store and poll until processing is complete.""" @@ -429,6 +445,7 @@ def upload_and_poll( file_id=file_obj.id, chunking_strategy=chunking_strategy, poll_interval_ms=poll_interval_ms, + max_wait_seconds=max_wait_seconds, attributes=attributes, ) @@ -785,6 +802,7 @@ async def create_and_poll( vector_store_id: str, attributes: Optional[Dict[str, Union[str, float, bool]]] | Omit = omit, poll_interval_ms: int | Omit = omit, + max_wait_seconds: float | None = 600, chunking_strategy: FileChunkingStrategyParam | Omit = omit, # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. # The extra values given here take precedence over values defined on the client or passed to this method. @@ -809,6 +827,7 @@ async def create_and_poll( file_id, vector_store_id=vector_store_id, poll_interval_ms=poll_interval_ms, + max_wait_seconds=max_wait_seconds, ) async def poll( @@ -817,6 +836,7 @@ async def poll( *, vector_store_id: str, poll_interval_ms: int | Omit = omit, + max_wait_seconds: float | None = 600, ) -> VectorStoreFile: """Wait for the vector store file to finish processing. @@ -827,6 +847,7 @@ async def poll( if is_given(poll_interval_ms): headers["X-Stainless-Custom-Poll-Interval"] = str(poll_interval_ms) + started_at = time.monotonic() while True: response = await self.with_raw_response.retrieve( file_id, @@ -836,6 +857,12 @@ async def poll( file = response.parse() if file.status == "in_progress": + elapsed = time.monotonic() - started_at + if max_wait_seconds is not None and elapsed >= max_wait_seconds: + raise PollingTimeoutError( + f"Polling timed out after {max_wait_seconds}s: file {file_id} is still in_progress" + ) + if not is_given(poll_interval_ms): from_header = response.headers.get("openai-poll-after-ms") if from_header is not None: @@ -843,7 +870,10 @@ async def poll( else: poll_interval_ms = 1000 - await self._sleep(poll_interval_ms / 1000) + sleep_seconds = poll_interval_ms / 1000 + if max_wait_seconds is not None: + sleep_seconds = min(sleep_seconds, max_wait_seconds - elapsed) + await self._sleep(sleep_seconds) elif file.status == "cancelled" or file.status == "completed" or file.status == "failed": return file else: @@ -876,6 +906,7 @@ async def upload_and_poll( file: FileTypes, attributes: Optional[Dict[str, Union[str, float, bool]]] | Omit = omit, poll_interval_ms: int | Omit = omit, + max_wait_seconds: float | None = 600, chunking_strategy: FileChunkingStrategyParam | Omit = omit, ) -> VectorStoreFile: """Add a file to a vector store and poll until processing is complete.""" @@ -884,6 +915,7 @@ async def upload_and_poll( vector_store_id=vector_store_id, file_id=file_obj.id, poll_interval_ms=poll_interval_ms, + max_wait_seconds=max_wait_seconds, chunking_strategy=chunking_strategy, attributes=attributes, ) diff --git a/tests/lib/test_vector_store_polling.py b/tests/lib/test_vector_store_polling.py new file mode 100644 index 0000000000..17a2e79f6f --- /dev/null +++ b/tests/lib/test_vector_store_polling.py @@ -0,0 +1,97 @@ +from __future__ import annotations + +import pytest + +from openai import PollingTimeoutError +from openai._models import construct_type_unchecked +from openai.resources.vector_stores.files import Files, AsyncFiles +from openai.resources.vector_stores.file_batches import FileBatches, AsyncFileBatches +from openai.types.vector_stores.vector_store_file import VectorStoreFile +from openai.types.vector_stores.vector_store_file_batch import VectorStoreFileBatch + + +class _RawResponse: + headers: dict[str, str] = {} + + def __init__(self, value: object) -> None: + self._value = value + + def parse(self) -> object: + return self._value + + +class _RawResponses: + def __init__(self, value: object) -> None: + self._value = value + + def retrieve(self, *_args: object, **_kwargs: object) -> _RawResponse: + return _RawResponse(self._value) + + +class _AsyncRawResponses: + def __init__(self, value: object) -> None: + self._value = value + + async def retrieve(self, *_args: object, **_kwargs: object) -> _RawResponse: + return _RawResponse(self._value) + + +def _in_progress_file() -> VectorStoreFile: + return construct_type_unchecked( + type_=VectorStoreFile, + value={ + "id": "file_123", + "created_at": 1, + "last_error": None, + "object": "vector_store.file", + "status": "in_progress", + "usage_bytes": 0, + "vector_store_id": "vs_123", + }, + ) + + +def _in_progress_batch() -> VectorStoreFileBatch: + return construct_type_unchecked( + type_=VectorStoreFileBatch, + value={ + "id": "batch_123", + "created_at": 1, + "file_counts": {"cancelled": 0, "completed": 0, "failed": 0, "in_progress": 1, "total": 1}, + "object": "vector_store.files_batch", + "status": "in_progress", + "vector_store_id": "vs_123", + }, + ) + + +def test_vector_store_file_poll_times_out() -> None: + files = object.__new__(Files) + files.with_raw_response = _RawResponses(_in_progress_file()) + + with pytest.raises(PollingTimeoutError, match="file file_123 is still in_progress"): + files.poll("file_123", vector_store_id="vs_123", max_wait_seconds=0) + + +async def test_async_vector_store_file_poll_times_out() -> None: + files = object.__new__(AsyncFiles) + files.with_raw_response = _AsyncRawResponses(_in_progress_file()) + + with pytest.raises(PollingTimeoutError, match="file file_123 is still in_progress"): + await files.poll("file_123", vector_store_id="vs_123", max_wait_seconds=0) + + +def test_vector_store_file_batch_poll_times_out() -> None: + batches = object.__new__(FileBatches) + batches.with_raw_response = _RawResponses(_in_progress_batch()) + + with pytest.raises(PollingTimeoutError, match="file batch batch_123 is still in_progress"): + batches.poll("batch_123", vector_store_id="vs_123", max_wait_seconds=0) + + +async def test_async_vector_store_file_batch_poll_times_out() -> None: + batches = object.__new__(AsyncFileBatches) + batches.with_raw_response = _AsyncRawResponses(_in_progress_batch()) + + with pytest.raises(PollingTimeoutError, match="file batch batch_123 is still in_progress"): + await batches.poll("batch_123", vector_store_id="vs_123", max_wait_seconds=0) From f436081ad543586ffe2ce726b233cc905371e8ce Mon Sep 17 00:00:00 2001 From: redactdeveloper <283494121+redactdeveloper@users.noreply.github.com> Date: Mon, 1 Jun 2026 06:44:26 +0300 Subject: [PATCH 3/3] fix: preserve response compaction output items (closes #3075) --- tests/lib/responses/test_responses.py | 42 ++++++++++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/tests/lib/responses/test_responses.py b/tests/lib/responses/test_responses.py index 0c2638be2e..79faee0fc0 100644 --- a/tests/lib/responses/test_responses.py +++ b/tests/lib/responses/test_responses.py @@ -12,8 +12,9 @@ from openai._types import omit from openai._utils import assert_signatures_in_sync from openai._models import construct_type_unchecked -from openai.types.responses import Response +from openai.types.responses import Response, ResponseCreatedEvent, ResponseCompletedEvent from openai.lib._parsing._responses import parse_response +from openai.lib.streaming.responses._responses import ResponseStreamState from ...conftest import base_url from ..snapshots import make_snapshot_request @@ -103,6 +104,45 @@ def test_parse_response_does_not_leak_schema_validators() -> None: assert sum(1 for obj in gc.get_objects() if type(obj) is SchemaValidator) == validator_count +def test_response_stream_completed_preserves_function_call_and_compaction_items() -> None: + state = ResponseStreamState(text_format=omit, input_tools=omit) + created_response = _minimal_response([], status="in_progress") + completed_response = _minimal_response( + [ + { + "id": "fc_test", + "type": "function_call", + "call_id": "call_test", + "name": "lookup", + "arguments": "{}", + "status": "completed", + }, + { + "id": "cmp_test", + "type": "compaction", + "encrypted_content": "encrypted", + }, + ] + ) + + state.handle_event( + construct_type_unchecked( + type_=ResponseCreatedEvent, + value={"type": "response.created", "sequence_number": 0, "response": created_response}, + ) + ) + events = state.handle_event( + construct_type_unchecked( + type_=ResponseCompletedEvent, + value={"type": "response.completed", "sequence_number": 1, "response": completed_response}, + ) + ) + + completed_event = events[0] + assert completed_event.type == "response.completed" + assert [item.type for item in completed_event.response.output] == ["function_call", "compaction"] + + @pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"]) def test_stream_method_definition_in_sync(sync: bool, client: OpenAI, async_client: AsyncOpenAI) -> None: checking_client: OpenAI | AsyncOpenAI = client if sync else async_client