Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/verify_release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,8 @@ jobs:
dir
python -m pip install -r requirements-test.txt
echo "INSTALL_CMD=$env:INSTALL_CMD"
PIP_CMD="python -m pip $($env:INSTALL_CMD) couchbase-analytics==$($env:SDK_VERSION)"
$PIP_CMD="python -m pip $($env:INSTALL_CMD) couchbase-analytics==$($env:SDK_VERSION)"
iex $PIP_CMD
python -m pip list
$TEST_ACOUCHBASE_API="${{ fromJson(needs.setup.outputs.stage_matrices).test_unit.test_acouchbase_api }}"
if ( $TEST_ACOUCHBASE_API -eq "true" ) {
Expand Down
4 changes: 2 additions & 2 deletions couchbase_analytics/common/_core/error_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ def update_request_context(self, request: QueryRequest) -> None:
def update_response_context(self, response: HttpCoreResponse) -> None:
network_stream = response.extensions.get('network_stream', None)
if network_stream is not None:
addr, port = network_stream.get_extra_info('client_addr')
addr, port, *_ = network_stream.get_extra_info('client_addr')
self.last_dispatched_from = f'{addr}:{port}'
addr, port = network_stream.get_extra_info('server_addr')
addr, port, *_ = network_stream.get_extra_info('server_addr')
self.last_dispatched_to = f'{addr}:{port}'
self.status_code = response.status_code

Expand Down
10 changes: 7 additions & 3 deletions couchbase_analytics/protocol/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ def __init__(
# The RequestContext generates a future that enables some background processing
# Allow the default max_workers which is (as of Python 3.8): min(32, os.cpu_count() + 4).
# We can add an option later if we see a need
self._tp_executor = ThreadPoolExecutor()
self._tp_executor_prefix = f'pycbac-tpe-{self._cluster_id[:8]}'
self._tp_executor = ThreadPoolExecutor(thread_name_prefix=self._tp_executor_prefix)
self._client_adapter.log_message(f'Created ThreadPoolExecutor({self._tp_executor_prefix})', LogLevel.INFO)
self._tp_executor_shutdown_called = False
atexit.register(self._shutdown_executor)

Expand Down Expand Up @@ -84,8 +86,7 @@ def _shutdown(self) -> None:
"""
self._client_adapter.close_client()
self._client_adapter.reset_client()
if self._tp_executor_shutdown_called is False:
self._tp_executor.shutdown()
self._shutdown_executor()

def _create_client(self) -> None:
"""
Expand All @@ -95,6 +96,9 @@ def _create_client(self) -> None:

def _shutdown_executor(self) -> None:
if self._tp_executor_shutdown_called is False:
self._client_adapter.log_message(
f'Shutting down ThreadPoolExecutor({self._tp_executor_prefix})', LogLevel.INFO
)
self._tp_executor.shutdown()
self._tp_executor_shutdown_called = True

Expand Down