Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions src/openai/resources/vector_stores/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from typing import TYPE_CHECKING, Dict, Union, Optional
from typing_extensions import Literal, assert_never

import time

import httpx

from ... import _legacy_response
Expand Down Expand Up @@ -314,6 +316,7 @@ def create_and_poll(
vector_store_id: str,
attributes: Optional[Dict[str, Union[str, float, bool]]] | Omit = omit,
poll_interval_ms: int | Omit = omit,
poll_timeout_ms: float | None | Omit = omit,
chunking_strategy: FileChunkingStrategyParam | Omit = omit,
# Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
# The extra values given here take precedence over values defined on the client or passed to this method.
Expand All @@ -338,6 +341,7 @@ def create_and_poll(
file_id,
vector_store_id=vector_store_id,
poll_interval_ms=poll_interval_ms,
poll_timeout_ms=poll_timeout_ms,
)

def poll(
Expand All @@ -346,6 +350,7 @@ def poll(
*,
vector_store_id: str,
poll_interval_ms: int | Omit = omit,
poll_timeout_ms: float | None | Omit = omit,
) -> VectorStoreFile:
"""Wait for the vector store file to finish processing.

Expand All @@ -356,6 +361,7 @@ def poll(
if is_given(poll_interval_ms):
headers["X-Stainless-Custom-Poll-Interval"] = str(poll_interval_ms)

start = time.monotonic()
while True:
response = self.with_raw_response.retrieve(
file_id,
Expand All @@ -372,6 +378,12 @@ def poll(
else:
poll_interval_ms = 1000

if is_given(poll_timeout_ms) and poll_timeout_ms is not None:
if (time.monotonic() - start) * 1000 >= poll_timeout_ms:
raise TimeoutError(
f"Vector store file {file_id!r} is still in_progress after {poll_timeout_ms:.0f}ms"
)

self._sleep(poll_interval_ms / 1000)
elif file.status == "cancelled" or file.status == "completed" or file.status == "failed":
return file
Expand Down Expand Up @@ -403,6 +415,7 @@ def upload_and_poll(
file: FileTypes,
attributes: Optional[Dict[str, Union[str, float, bool]]] | Omit = omit,
poll_interval_ms: int | Omit = omit,
poll_timeout_ms: float | None | Omit = omit,
chunking_strategy: FileChunkingStrategyParam | Omit = omit,
) -> VectorStoreFile:
"""Add a file to a vector store and poll until processing is complete."""
Expand All @@ -412,6 +425,7 @@ def upload_and_poll(
file_id=file_obj.id,
chunking_strategy=chunking_strategy,
poll_interval_ms=poll_interval_ms,
poll_timeout_ms=poll_timeout_ms,
attributes=attributes,
)

Expand Down Expand Up @@ -747,6 +761,7 @@ async def create_and_poll(
vector_store_id: str,
attributes: Optional[Dict[str, Union[str, float, bool]]] | Omit = omit,
poll_interval_ms: int | Omit = omit,
poll_timeout_ms: float | None | Omit = omit,
chunking_strategy: FileChunkingStrategyParam | Omit = omit,
# Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
# The extra values given here take precedence over values defined on the client or passed to this method.
Expand All @@ -771,6 +786,14 @@ async def create_and_poll(
file_id,
vector_store_id=vector_store_id,
poll_interval_ms=poll_interval_ms,
poll_timeout_ms=poll_timeout_ms,
)

return await self.poll(
file_id,
vector_store_id=vector_store_id,
poll_interval_ms=poll_interval_ms,
poll_timeout_ms=poll_timeout_ms,
)

async def poll(
Expand All @@ -779,6 +802,7 @@ async def poll(
*,
vector_store_id: str,
poll_interval_ms: int | Omit = omit,
poll_timeout_ms: float | None | Omit = omit,
) -> VectorStoreFile:
"""Wait for the vector store file to finish processing.

Expand All @@ -789,6 +813,7 @@ async def poll(
if is_given(poll_interval_ms):
headers["X-Stainless-Custom-Poll-Interval"] = str(poll_interval_ms)

start = time.monotonic()
while True:
response = await self.with_raw_response.retrieve(
file_id,
Expand All @@ -805,6 +830,12 @@ async def poll(
else:
poll_interval_ms = 1000

if is_given(poll_timeout_ms) and poll_timeout_ms is not None:
if (time.monotonic() - start) * 1000 >= poll_timeout_ms:
raise TimeoutError(
f"Vector store file {file_id!r} is still in_progress after {poll_timeout_ms:.0f}ms"
)

await self._sleep(poll_interval_ms / 1000)
elif file.status == "cancelled" or file.status == "completed" or file.status == "failed":
return file
Expand Down Expand Up @@ -838,6 +869,7 @@ async def upload_and_poll(
file: FileTypes,
attributes: Optional[Dict[str, Union[str, float, bool]]] | Omit = omit,
poll_interval_ms: int | Omit = omit,
poll_timeout_ms: float | None | Omit = omit,
chunking_strategy: FileChunkingStrategyParam | Omit = omit,
) -> VectorStoreFile:
"""Add a file to a vector store and poll until processing is complete."""
Expand All @@ -846,6 +878,7 @@ async def upload_and_poll(
vector_store_id=vector_store_id,
file_id=file_obj.id,
poll_interval_ms=poll_interval_ms,
poll_timeout_ms=poll_timeout_ms,
chunking_strategy=chunking_strategy,
attributes=attributes,
)
Expand Down