Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/openai/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
APIConnectionError,
AuthenticationError,
InternalServerError,
PollingTimeoutError,
PermissionDeniedError,
LengthFinishReasonError,
WebSocketQueueFullError,
Expand Down Expand Up @@ -68,6 +69,7 @@
"UnprocessableEntityError",
"RateLimitError",
"InternalServerError",
"PollingTimeoutError",
"LengthFinishReasonError",
"ContentFilterFinishReasonError",
"InvalidWebhookSignatureError",
Expand Down
5 changes: 5 additions & 0 deletions src/openai/_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"LengthFinishReasonError",
"ContentFilterFinishReasonError",
"InvalidWebhookSignatureError",
"PollingTimeoutError",
"SubjectTokenProviderError",
"WebSocketConnectionClosedError",
"WebSocketQueueFullError",
Expand All @@ -37,6 +38,10 @@ class OpenAIError(Exception):
pass


class PollingTimeoutError(OpenAIError):
pass


class SubjectTokenProviderError(OpenAIError):
response: httpx.Response | None

Expand Down
6 changes: 3 additions & 3 deletions src/openai/lib/_parsing/_responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def parse_response(

content_list.append(
construct_type_unchecked(
type_=ParsedResponseOutputText[TextFormatT],
type_=ParsedResponseOutputText,
value={
**item.to_dict(),
"parsed": parse_text(item.text, text_format=text_format),
Expand All @@ -78,7 +78,7 @@ def parse_response(

output_list.append(
construct_type_unchecked(
type_=ParsedResponseOutputMessage[TextFormatT],
type_=ParsedResponseOutputMessage,
value={
**output.to_dict(),
"content": content_list,
Expand Down Expand Up @@ -130,7 +130,7 @@ def parse_response(
output_list.append(output)

return construct_type_unchecked(
type_=ParsedResponse[TextFormatT],
type_=ParsedResponse,
value={
**response.to_dict(),
"output": output_list,
Expand Down
36 changes: 34 additions & 2 deletions src/openai/resources/vector_stores/file_batches.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import time
import asyncio
from typing import Dict, Iterable, Optional
from typing_extensions import Union, Literal
Expand All @@ -18,6 +19,7 @@
from ..._resource import SyncAPIResource, AsyncAPIResource
from ..._response import to_streamed_response_wrapper, async_to_streamed_response_wrapper
from ...pagination import SyncCursorPage, AsyncCursorPage
from ..._exceptions import PollingTimeoutError
from ..._base_client import AsyncPaginator, make_request_options
from ...types.file_object import FileObject
from ...types.vector_stores import file_batch_create_params, file_batch_list_files_params
Expand Down Expand Up @@ -223,6 +225,7 @@ def create_and_poll(
file_ids: SequenceNotStr[str] | Omit = omit,
files: Iterable[file_batch_create_params.File] | Omit = omit,
poll_interval_ms: int | Omit = omit,
max_wait_seconds: float | None = 600,
# Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
# The extra values given here take precedence over values defined on the client or passed to this method.
extra_headers: Headers | None = None,
Expand All @@ -247,6 +250,7 @@ def create_and_poll(
batch.id,
vector_store_id=vector_store_id,
poll_interval_ms=poll_interval_ms,
max_wait_seconds=max_wait_seconds,
)

def list_files(
Expand Down Expand Up @@ -334,6 +338,7 @@ def poll(
*,
vector_store_id: str,
poll_interval_ms: int | Omit = omit,
max_wait_seconds: float | None = 600,
) -> VectorStoreFileBatch:
"""Wait for the given file batch to be processed.

Expand All @@ -344,6 +349,7 @@ def poll(
if is_given(poll_interval_ms):
headers["X-Stainless-Custom-Poll-Interval"] = str(poll_interval_ms)

started_at = time.monotonic()
while True:
response = self.with_raw_response.retrieve(
batch_id,
Expand All @@ -353,14 +359,23 @@ def poll(

batch = response.parse()
if batch.file_counts.in_progress > 0:
elapsed = time.monotonic() - started_at
if max_wait_seconds is not None and elapsed >= max_wait_seconds:
raise PollingTimeoutError(
f"Polling timed out after {max_wait_seconds}s: file batch {batch_id} is still in_progress"
)

if not is_given(poll_interval_ms):
from_header = response.headers.get("openai-poll-after-ms")
if from_header is not None:
poll_interval_ms = int(from_header)
else:
poll_interval_ms = 1000

self._sleep(poll_interval_ms / 1000)
sleep_seconds = poll_interval_ms / 1000
if max_wait_seconds is not None:
sleep_seconds = min(sleep_seconds, max_wait_seconds - elapsed)
self._sleep(sleep_seconds)
continue

return batch
Expand All @@ -373,6 +388,7 @@ def upload_and_poll(
max_concurrency: int = 5,
file_ids: SequenceNotStr[str] = [],
poll_interval_ms: int | Omit = omit,
max_wait_seconds: float | None = 600,
chunking_strategy: FileChunkingStrategyParam | Omit = omit,
) -> VectorStoreFileBatch:
"""Uploads the given files concurrently and then creates a vector store file batch.
Expand Down Expand Up @@ -411,6 +427,7 @@ def upload_and_poll(
vector_store_id=vector_store_id,
file_ids=[*file_ids, *(f.id for f in results)],
poll_interval_ms=poll_interval_ms,
max_wait_seconds=max_wait_seconds,
chunking_strategy=chunking_strategy,
)
return batch
Expand Down Expand Up @@ -611,6 +628,7 @@ async def create_and_poll(
file_ids: SequenceNotStr[str] | Omit = omit,
files: Iterable[file_batch_create_params.File] | Omit = omit,
poll_interval_ms: int | Omit = omit,
max_wait_seconds: float | None = 600,
# Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
# The extra values given here take precedence over values defined on the client or passed to this method.
extra_headers: Headers | None = None,
Expand All @@ -635,6 +653,7 @@ async def create_and_poll(
batch.id,
vector_store_id=vector_store_id,
poll_interval_ms=poll_interval_ms,
max_wait_seconds=max_wait_seconds,
)

def list_files(
Expand Down Expand Up @@ -722,6 +741,7 @@ async def poll(
*,
vector_store_id: str,
poll_interval_ms: int | Omit = omit,
max_wait_seconds: float | None = 600,
) -> VectorStoreFileBatch:
"""Wait for the given file batch to be processed.

Expand All @@ -732,6 +752,7 @@ async def poll(
if is_given(poll_interval_ms):
headers["X-Stainless-Custom-Poll-Interval"] = str(poll_interval_ms)

started_at = time.monotonic()
while True:
response = await self.with_raw_response.retrieve(
batch_id,
Expand All @@ -741,14 +762,23 @@ async def poll(

batch = response.parse()
if batch.file_counts.in_progress > 0:
elapsed = time.monotonic() - started_at
if max_wait_seconds is not None and elapsed >= max_wait_seconds:
raise PollingTimeoutError(
f"Polling timed out after {max_wait_seconds}s: file batch {batch_id} is still in_progress"
)

if not is_given(poll_interval_ms):
from_header = response.headers.get("openai-poll-after-ms")
if from_header is not None:
poll_interval_ms = int(from_header)
else:
poll_interval_ms = 1000

await self._sleep(poll_interval_ms / 1000)
sleep_seconds = poll_interval_ms / 1000
if max_wait_seconds is not None:
sleep_seconds = min(sleep_seconds, max_wait_seconds - elapsed)
await self._sleep(sleep_seconds)
continue

return batch
Expand All @@ -761,6 +791,7 @@ async def upload_and_poll(
max_concurrency: int = 5,
file_ids: SequenceNotStr[str] = [],
poll_interval_ms: int | Omit = omit,
max_wait_seconds: float | None = 600,
chunking_strategy: FileChunkingStrategyParam | Omit = omit,
) -> VectorStoreFileBatch:
"""Uploads the given files concurrently and then creates a vector store file batch.
Expand Down Expand Up @@ -822,6 +853,7 @@ async def trio_upload_file(limiter: trio.CapacityLimiter, file: FileTypes) -> No
vector_store_id=vector_store_id,
file_ids=[*file_ids, *(f.id for f in uploaded_files)],
poll_interval_ms=poll_interval_ms,
max_wait_seconds=max_wait_seconds,
chunking_strategy=chunking_strategy,
)
return batch
Expand Down
36 changes: 34 additions & 2 deletions src/openai/resources/vector_stores/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import time
from typing import TYPE_CHECKING, Dict, Union, Optional
from typing_extensions import Literal, assert_never

Expand All @@ -15,6 +16,7 @@
from ..._resource import SyncAPIResource, AsyncAPIResource
from ..._response import to_streamed_response_wrapper, async_to_streamed_response_wrapper
from ...pagination import SyncPage, AsyncPage, SyncCursorPage, AsyncCursorPage
from ..._exceptions import PollingTimeoutError
from ..._base_client import AsyncPaginator, make_request_options
from ...types.vector_stores import file_list_params, file_create_params, file_update_params
from ...types.file_chunking_strategy_param import FileChunkingStrategyParam
Expand Down Expand Up @@ -331,6 +333,7 @@ def create_and_poll(
vector_store_id: str,
attributes: Optional[Dict[str, Union[str, float, bool]]] | Omit = omit,
poll_interval_ms: int | Omit = omit,
max_wait_seconds: float | None = 600,
chunking_strategy: FileChunkingStrategyParam | Omit = omit,
# Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
# The extra values given here take precedence over values defined on the client or passed to this method.
Expand All @@ -355,6 +358,7 @@ def create_and_poll(
file_id,
vector_store_id=vector_store_id,
poll_interval_ms=poll_interval_ms,
max_wait_seconds=max_wait_seconds,
)

def poll(
Expand All @@ -363,6 +367,7 @@ def poll(
*,
vector_store_id: str,
poll_interval_ms: int | Omit = omit,
max_wait_seconds: float | None = 600,
) -> VectorStoreFile:
"""Wait for the vector store file to finish processing.

Expand All @@ -373,6 +378,7 @@ def poll(
if is_given(poll_interval_ms):
headers["X-Stainless-Custom-Poll-Interval"] = str(poll_interval_ms)

started_at = time.monotonic()
while True:
response = self.with_raw_response.retrieve(
file_id,
Expand All @@ -382,14 +388,23 @@ def poll(

file = response.parse()
if file.status == "in_progress":
elapsed = time.monotonic() - started_at
if max_wait_seconds is not None and elapsed >= max_wait_seconds:
raise PollingTimeoutError(
f"Polling timed out after {max_wait_seconds}s: file {file_id} is still in_progress"
)

if not is_given(poll_interval_ms):
from_header = response.headers.get("openai-poll-after-ms")
if from_header is not None:
poll_interval_ms = int(from_header)
else:
poll_interval_ms = 1000

self._sleep(poll_interval_ms / 1000)
sleep_seconds = poll_interval_ms / 1000
if max_wait_seconds is not None:
sleep_seconds = min(sleep_seconds, max_wait_seconds - elapsed)
self._sleep(sleep_seconds)
elif file.status == "cancelled" or file.status == "completed" or file.status == "failed":
return file
else:
Expand Down Expand Up @@ -420,6 +435,7 @@ def upload_and_poll(
file: FileTypes,
attributes: Optional[Dict[str, Union[str, float, bool]]] | Omit = omit,
poll_interval_ms: int | Omit = omit,
max_wait_seconds: float | None = 600,
chunking_strategy: FileChunkingStrategyParam | Omit = omit,
) -> VectorStoreFile:
"""Add a file to a vector store and poll until processing is complete."""
Expand All @@ -429,6 +445,7 @@ def upload_and_poll(
file_id=file_obj.id,
chunking_strategy=chunking_strategy,
poll_interval_ms=poll_interval_ms,
max_wait_seconds=max_wait_seconds,
attributes=attributes,
)

Expand Down Expand Up @@ -785,6 +802,7 @@ async def create_and_poll(
vector_store_id: str,
attributes: Optional[Dict[str, Union[str, float, bool]]] | Omit = omit,
poll_interval_ms: int | Omit = omit,
max_wait_seconds: float | None = 600,
chunking_strategy: FileChunkingStrategyParam | Omit = omit,
# Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
# The extra values given here take precedence over values defined on the client or passed to this method.
Expand All @@ -809,6 +827,7 @@ async def create_and_poll(
file_id,
vector_store_id=vector_store_id,
poll_interval_ms=poll_interval_ms,
max_wait_seconds=max_wait_seconds,
)

async def poll(
Expand All @@ -817,6 +836,7 @@ async def poll(
*,
vector_store_id: str,
poll_interval_ms: int | Omit = omit,
max_wait_seconds: float | None = 600,
) -> VectorStoreFile:
"""Wait for the vector store file to finish processing.

Expand All @@ -827,6 +847,7 @@ async def poll(
if is_given(poll_interval_ms):
headers["X-Stainless-Custom-Poll-Interval"] = str(poll_interval_ms)

started_at = time.monotonic()
while True:
response = await self.with_raw_response.retrieve(
file_id,
Expand All @@ -836,14 +857,23 @@ async def poll(

file = response.parse()
if file.status == "in_progress":
elapsed = time.monotonic() - started_at
if max_wait_seconds is not None and elapsed >= max_wait_seconds:
raise PollingTimeoutError(
f"Polling timed out after {max_wait_seconds}s: file {file_id} is still in_progress"
)

if not is_given(poll_interval_ms):
from_header = response.headers.get("openai-poll-after-ms")
if from_header is not None:
poll_interval_ms = int(from_header)
else:
poll_interval_ms = 1000

await self._sleep(poll_interval_ms / 1000)
sleep_seconds = poll_interval_ms / 1000
if max_wait_seconds is not None:
sleep_seconds = min(sleep_seconds, max_wait_seconds - elapsed)
await self._sleep(sleep_seconds)
elif file.status == "cancelled" or file.status == "completed" or file.status == "failed":
return file
else:
Expand Down Expand Up @@ -876,6 +906,7 @@ async def upload_and_poll(
file: FileTypes,
attributes: Optional[Dict[str, Union[str, float, bool]]] | Omit = omit,
poll_interval_ms: int | Omit = omit,
max_wait_seconds: float | None = 600,
chunking_strategy: FileChunkingStrategyParam | Omit = omit,
) -> VectorStoreFile:
"""Add a file to a vector store and poll until processing is complete."""
Expand All @@ -884,6 +915,7 @@ async def upload_and_poll(
vector_store_id=vector_store_id,
file_id=file_obj.id,
poll_interval_ms=poll_interval_ms,
max_wait_seconds=max_wait_seconds,
chunking_strategy=chunking_strategy,
attributes=attributes,
)
Expand Down
Loading