Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ env:
CBCI_SUPPORTED_ARM64_PLATFORMS: "linux macos"
CBCI_DEFAULT_LINUX_X86_64_PLATFORM: "ubuntu-22.04"
CBCI_DEFAULT_LINUX_ARM64_PLATFORM: "ubuntu-22.04-arm"
CBCI_DEFAULT_MACOS_X86_64_PLATFORM: "macos-13"
CBCI_DEFAULT_MACOS_ARM64_PLATFORM: "macos-14"
CBCI_DEFAULT_MACOS_X86_64_PLATFORM: "macos-15-intel"
CBCI_DEFAULT_MACOS_ARM64_PLATFORM: "macos-15"
CBCI_DEFAULT_WINDOWS_PLATFORM: "windows-2022"
CBCI_DEFAULT_LINUX_CONTAINER: "slim-bookworm"
CBCI_DEFAULT_ALPINE_CONTAINER: "alpine"
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ env:
CBCI_SUPPORTED_ARM64_PLATFORMS: "linux macos"
CBCI_DEFAULT_LINUX_X86_64_PLATFORM: "ubuntu-22.04"
CBCI_DEFAULT_LINUX_ARM64_PLATFORM: "ubuntu-22.04-arm"
CBCI_DEFAULT_MACOS_X86_64_PLATFORM: "macos-13"
CBCI_DEFAULT_MACOS_ARM64_PLATFORM: "macos-14"
CBCI_DEFAULT_MACOS_X86_64_PLATFORM: "macos-15-intel"
CBCI_DEFAULT_MACOS_ARM64_PLATFORM: "macos-15"
CBCI_DEFAULT_WINDOWS_PLATFORM: "windows-2022"
CBCI_DEFAULT_LINUX_CONTAINER: "slim-bookworm"
CBCI_DEFAULT_ALPINE_CONTAINER: "alpine"
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/verify_release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ env:
CBCI_SUPPORTED_ARM64_PLATFORMS: "linux macos"
CBCI_DEFAULT_LINUX_X86_64_PLATFORM: "ubuntu-22.04"
CBCI_DEFAULT_LINUX_ARM64_PLATFORM: "ubuntu-22.04-arm"
CBCI_DEFAULT_MACOS_X86_64_PLATFORM: "macos-13"
CBCI_DEFAULT_MACOS_ARM64_PLATFORM: "macos-14"
CBCI_DEFAULT_MACOS_X86_64_PLATFORM: "macos-15-intel"
CBCI_DEFAULT_MACOS_ARM64_PLATFORM: "macos-15"
CBCI_DEFAULT_WINDOWS_PLATFORM: "windows-2022"
CBCI_DEFAULT_LINUX_CONTAINER: "slim-bookworm"
CBCI_DEFAULT_ALPINE_CONTAINER: "alpine"
Expand Down
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -176,5 +176,8 @@ gocaves*
.pytest_cache/
test_scripts/

# rff
# ruff
.ruff_cache/

# other
.DS_Store
2 changes: 2 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ repos:
- pytest~=8.3.5
- httpx~=0.28.1
- aiohttp~=3.11.10
- sniffio~=1.3.1
- anyio~=4.9.0
types:
- python
require_serial: true
Expand Down
19 changes: 18 additions & 1 deletion acouchbase_analytics/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
from typing import TypeAlias

from acouchbase_analytics.database import AsyncDatabase
from couchbase_analytics.result import AsyncQueryResult
from acouchbase_analytics.query_handle import AsyncQueryHandle
from acouchbase_analytics.result import AsyncQueryResult

if TYPE_CHECKING:
from couchbase_analytics.credential import Credential
Expand Down Expand Up @@ -143,6 +144,22 @@ def execute_query(self, statement: str, *args: object, **kwargs: object) -> Awai
""" # noqa: E501
return self._impl.execute_query(statement, *args, **kwargs)

def start_query(self, statement: str, *args: object, **kwargs: object) -> Awaitable[AsyncQueryHandle]:
"""Executes a query against an Analytics cluster in async mode.

.. seealso::
:meth:`acouchbase_analytics.Scope.start_query`: For how to execute scope-level queries.

Args:
statement: The SQL++ statement to execute.
options (:class:`~acouchbase_analytics.options.StartQueryOptions`): Optional parameters for the query operation.
**kwargs (Dict[str, Any]): keyword arguments that can be used in place or to override provided :class:`~acouchbase_analytics.options.StartQueryOptions`

Returns:
:class:`~acouchbase_analytics.query_handle.AsyncQueryHandle`: An instance of a :class:`~acouchbase_analytics.query_handle.AsyncQueryHandle`
""" # noqa: E501
return self._impl.start_query(statement, *args, **kwargs)

async def shutdown(self) -> None:
"""Shuts down this cluster instance. Cleaning up all resources associated with it.

Expand Down
41 changes: 35 additions & 6 deletions acouchbase_analytics/cluster.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,19 @@ if sys.version_info < (3, 11):
else:
from typing import Unpack

from acouchbase_analytics import JSONType
from acouchbase_analytics.credential import Credential
from acouchbase_analytics.database import AsyncDatabase
from couchbase_analytics.credential import Credential
from couchbase_analytics.options import ClusterOptions, ClusterOptionsKwargs, QueryOptions, QueryOptionsKwargs
from couchbase_analytics.result import AsyncQueryResult
from acouchbase_analytics.options import (
ClusterOptions,
ClusterOptionsKwargs,
QueryOptions,
QueryOptionsKwargs,
StartQueryOptions,
StartQueryOptionsKwargs,
)
from acouchbase_analytics.query_handle import AsyncQueryHandle
from acouchbase_analytics.result import AsyncQueryResult

class AsyncCluster:
@overload
Expand Down Expand Up @@ -54,14 +63,34 @@ class AsyncCluster:
) -> Awaitable[AsyncQueryResult]: ...
@overload
def execute_query(
self, statement: str, options: QueryOptions, *args: str, **kwargs: Unpack[QueryOptionsKwargs]
self, statement: str, options: QueryOptions, *args: JSONType, **kwargs: Unpack[QueryOptionsKwargs]
) -> Awaitable[AsyncQueryResult]: ...
@overload
def execute_query(
self, statement: str, options: QueryOptions, *args: str, **kwargs: str
self, statement: str, options: QueryOptions, *args: JSONType, **kwargs: str
) -> Awaitable[AsyncQueryResult]: ...
@overload
def execute_query(self, statement: str, *args: str, **kwargs: str) -> Awaitable[AsyncQueryResult]: ...
def execute_query(self, statement: str, *args: JSONType, **kwargs: str) -> Awaitable[AsyncQueryResult]: ...
@overload
def start_query(self, statement: str) -> AsyncQueryHandle: ...
@overload
def start_query(self, statement: str, options: StartQueryOptions) -> AsyncQueryHandle: ...
@overload
def start_query(self, statement: str, **kwargs: Unpack[StartQueryOptionsKwargs]) -> AsyncQueryHandle: ...
@overload
def start_query(
self, statement: str, options: StartQueryOptions, **kwargs: Unpack[StartQueryOptionsKwargs]
) -> AsyncQueryHandle: ...
@overload
def start_query(
self, statement: str, options: StartQueryOptions, *args: JSONType, **kwargs: Unpack[StartQueryOptionsKwargs]
) -> AsyncQueryHandle: ...
@overload
def start_query(
self, statement: str, options: StartQueryOptions, *args: JSONType, **kwargs: str
) -> AsyncQueryHandle: ...
@overload
def start_query(self, statement: str, *args: JSONType, **kwargs: str) -> AsyncQueryHandle: ...
def shutdown(self) -> Awaitable[None]: ...
@overload
@classmethod
Expand Down
4 changes: 4 additions & 0 deletions acouchbase_analytics/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@

from couchbase_analytics.common.options import ClusterOptions as ClusterOptions # noqa: F401
from couchbase_analytics.common.options import ClusterOptionsKwargs as ClusterOptionsKwargs # noqa: F401
from couchbase_analytics.common.options import FetchResultsOptions as FetchResultsOptions # noqa: F401
from couchbase_analytics.common.options import FetchResultsOptionsKwargs as FetchResultsOptionsKwargs # noqa: F401
from couchbase_analytics.common.options import QueryOptions as QueryOptions # noqa: F401
from couchbase_analytics.common.options import QueryOptionsKwargs as QueryOptionsKwargs # noqa: F401
from couchbase_analytics.common.options import SecurityOptions as SecurityOptions # noqa: F401
from couchbase_analytics.common.options import SecurityOptionsKwargs as SecurityOptionsKwargs # noqa: F401
from couchbase_analytics.common.options import StartQueryOptions as StartQueryOptions # noqa: F401
from couchbase_analytics.common.options import StartQueryOptionsKwargs as StartQueryOptionsKwargs # noqa: F401
from couchbase_analytics.common.options import TimeoutOptions as TimeoutOptions # noqa: F401
from couchbase_analytics.common.options import TimeoutOptionsKwargs as TimeoutOptionsKwargs # noqa: F401
2 changes: 1 addition & 1 deletion acouchbase_analytics/protocol/_core/anyio_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def current_async_library() -> Optional[AsyncBackend]:
try:
import sniffio
except ImportError:
async_lib = 'asyncio'
return AsyncBackend('asyncio')

try:
async_lib = sniffio.current_async_library()
Expand Down
25 changes: 18 additions & 7 deletions acouchbase_analytics/protocol/_core/client_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,18 @@
from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Optional, cast
from typing import Optional, Union, cast
from uuid import uuid4

from httpx import URL, AsyncClient, BasicAuth, Response

from couchbase_analytics.common.credential import Credential
from couchbase_analytics.common.deserializer import Deserializer
from couchbase_analytics.common.logging import LogLevel, log_message
from couchbase_analytics.protocol._core.request import CancelRequest, HttpRequest, QueryRequest, StartQueryRequest
from couchbase_analytics.protocol.connection import _ConnectionDetails
from couchbase_analytics.protocol.options import OptionsBuilder

if TYPE_CHECKING:
from couchbase_analytics.protocol._core.request import QueryRequest


class _AsyncClientAdapter:
"""
Expand Down Expand Up @@ -164,7 +162,9 @@ async def create_client(self) -> None:
def log_message(self, message: str, log_level: LogLevel) -> None:
log_message(logger, f'{self.log_prefix} {message}', log_level)

async def send_request(self, request: QueryRequest) -> Response:
async def send_request(
self, request: Union[CancelRequest, HttpRequest, QueryRequest, StartQueryRequest], stream: Optional[bool] = True
) -> Response:
"""
**INTERNAL**
"""
Expand All @@ -177,8 +177,19 @@ async def send_request(self, request: QueryRequest) -> Response:
port=request.url.port,
path=request.url.path,
)
req = self._client.build_request(request.method, url, json=request.body, extensions=request.extensions)
return await self._client.send(req, stream=True)
if isinstance(request, (QueryRequest, StartQueryRequest)):
req = self._client.build_request(request.method, url, json=request.body, extensions=request.extensions)
else:
headers = request.headers if request.headers is not None else None
data = request.data if isinstance(request, CancelRequest) else None
req = self._client.build_request(
request.method, url, data=data, headers=headers, extensions=request.extensions
)

if stream is None:
stream = True

return await self._client.send(req, stream=stream)

def reset_client(self) -> None:
"""
Expand Down
Loading
Loading