diff --git a/.github/workflows/verify_release.yml b/.github/workflows/verify_release.yml index 107817d..81d7c65 100644 --- a/.github/workflows/verify_release.yml +++ b/.github/workflows/verify_release.yml @@ -451,7 +451,8 @@ jobs: dir python -m pip install -r requirements-test.txt echo "INSTALL_CMD=$env:INSTALL_CMD" - PIP_CMD="python -m pip $($env:INSTALL_CMD) couchbase-analytics==$($env:SDK_VERSION)" + $PIP_CMD="python -m pip $($env:INSTALL_CMD) couchbase-analytics==$($env:SDK_VERSION)" + iex $PIP_CMD python -m pip list $TEST_ACOUCHBASE_API="${{ fromJson(needs.setup.outputs.stage_matrices).test_unit.test_acouchbase_api }}" if ( $TEST_ACOUCHBASE_API -eq "true" ) { diff --git a/couchbase_analytics/common/_core/error_context.py b/couchbase_analytics/common/_core/error_context.py index 6270fea..0e51248 100644 --- a/couchbase_analytics/common/_core/error_context.py +++ b/couchbase_analytics/common/_core/error_context.py @@ -56,9 +56,9 @@ def update_request_context(self, request: QueryRequest) -> None: def update_response_context(self, response: HttpCoreResponse) -> None: network_stream = response.extensions.get('network_stream', None) if network_stream is not None: - addr, port = network_stream.get_extra_info('client_addr') + addr, port, *_ = network_stream.get_extra_info('client_addr') self.last_dispatched_from = f'{addr}:{port}' - addr, port = network_stream.get_extra_info('server_addr') + addr, port, *_ = network_stream.get_extra_info('server_addr') self.last_dispatched_to = f'{addr}:{port}' self.status_code = response.status_code diff --git a/couchbase_analytics/protocol/cluster.py b/couchbase_analytics/protocol/cluster.py index 7b6a52a..366844a 100644 --- a/couchbase_analytics/protocol/cluster.py +++ b/couchbase_analytics/protocol/cluster.py @@ -46,7 +46,9 @@ def __init__( # The RequestContext generates a future that enables some background processing # Allow the default max_workers which is (as of Python 3.8): min(32, os.cpu_count() + 4). # We can add an option later if we see a need - self._tp_executor = ThreadPoolExecutor() + self._tp_executor_prefix = f'pycbac-tpe-{self._cluster_id[:8]}' + self._tp_executor = ThreadPoolExecutor(thread_name_prefix=self._tp_executor_prefix) + self._client_adapter.log_message(f'Created ThreadPoolExecutor({self._tp_executor_prefix})', LogLevel.INFO) self._tp_executor_shutdown_called = False atexit.register(self._shutdown_executor) @@ -84,8 +86,7 @@ def _shutdown(self) -> None: """ self._client_adapter.close_client() self._client_adapter.reset_client() - if self._tp_executor_shutdown_called is False: - self._tp_executor.shutdown() + self._shutdown_executor() def _create_client(self) -> None: """ @@ -95,6 +96,9 @@ def _create_client(self) -> None: def _shutdown_executor(self) -> None: if self._tp_executor_shutdown_called is False: + self._client_adapter.log_message( + f'Shutting down ThreadPoolExecutor({self._tp_executor_prefix})', LogLevel.INFO + ) self._tp_executor.shutdown() self._tp_executor_shutdown_called = True