diff --git a/CHANGELOG.md b/CHANGELOG.md index 4537ad3f8bc..d8138ce350b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,6 +48,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#4979](https://github.com/open-telemetry/opentelemetry-python/pull/4979)) - `opentelemetry-sdk`: Map Python `CRITICAL` log level to OTel `FATAL` severity text per the specification ([#4984](https://github.com/open-telemetry/opentelemetry-python/issues/4984)) +- `opentelemetry-sdk`: fix `BatchProcessor.force_flush` to respect `timeout_millis`, previously the timeout was ignored and the flush would block until all telemetry was exported + ([#4982](https://github.com/open-telemetry/opentelemetry-python/pull/4982)) - `opentelemetry-sdk`: Add file configuration support with YAML/JSON loading, environment variable substitution, and schema validation against the vendored OTel config JSON schema ([#4898](https://github.com/open-telemetry/opentelemetry-python/pull/4898)) - Fix intermittent CI failures in `getting-started` and `tracecontext` jobs caused by GitHub git CDN SHA propagation lag by installing contrib packages from the already-checked-out local copy instead of a second git clone diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py index cde19165d62..f18aec2e9ff 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py @@ -18,6 +18,7 @@ import enum import inspect import logging +import math import os import threading import time @@ -176,12 +177,19 @@ def worker(self): self._worker_awaken.clear() self._export(BatchExportStrategy.EXPORT_ALL) - def _export(self, batch_strategy: BatchExportStrategy) -> None: + def _export( + self, + batch_strategy: BatchExportStrategy, + deadline: float = math.inf, + ) -> bool: + # Returns True if all batches were exported, False if deadline was reached. with self._export_lock: iteration = 0 # We could see concurrent export calls from worker and force_flush. We call _should_export_batch # once the lock is obtained to see if we still need to make the requested export. while self._should_export_batch(batch_strategy, iteration): + if time.time() >= deadline: + return False iteration += 1 token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) error: Exception | None = None @@ -206,6 +214,7 @@ def _export(self, batch_strategy: BatchExportStrategy) -> None: finally: self._metrics.finish_items(count, error) detach(token) + return True def emit(self, data: Telemetry) -> None: if self._shutdown: @@ -248,10 +257,13 @@ def shutdown(self, timeout_millis: int = 30000): # call is ongoing and the thread isn't finished. In this case we will return instead of waiting on # the thread to finish. - # TODO: Fix force flush so the timeout is used https://github.com/open-telemetry/opentelemetry-python/issues/4568. def force_flush(self, timeout_millis: Optional[int] = None) -> bool: if self._shutdown: return False + deadline = ( + time.time() + (timeout_millis / 1000) + if timeout_millis is not None + else math.inf + ) # Blocking call to export. - self._export(BatchExportStrategy.EXPORT_ALL) - return True + return self._export(BatchExportStrategy.EXPORT_ALL, deadline) diff --git a/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py b/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py index f5ed15110b9..e3b6babb9ef 100644 --- a/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py +++ b/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py @@ -20,6 +20,7 @@ import threading import time import unittest +import unittest.mock import weakref from platform import system from typing import Any @@ -171,6 +172,77 @@ def test_force_flush_flushes_telemetry( exporter.export.assert_called_once_with([telemetry for _ in range(10)]) batch_processor.shutdown() + # pylint: disable=no-self-use + def test_force_flush_returns_true_when_all_exported( + self, batch_processor_class, telemetry + ): + exporter = Mock() + batch_processor = batch_processor_class( + exporter, + max_queue_size=15, + max_export_batch_size=15, + schedule_delay_millis=30000, + export_timeout_millis=500, + ) + for _ in range(10): + batch_processor._batch_processor.emit(telemetry) + result = batch_processor.force_flush(timeout_millis=5000) + assert result is True + exporter.export.assert_called_once() + batch_processor.shutdown() + + # pylint: disable=no-self-use + def test_force_flush_returns_false_when_timeout_exceeded( + self, batch_processor_class, telemetry + ): + exporter = Mock() + batch_processor = batch_processor_class( + exporter, + max_queue_size=15, + max_export_batch_size=1, + schedule_delay_millis=30000, + export_timeout_millis=500, + ) + # Stop the worker thread first so it cannot export or interfere. + batch_processor._batch_processor._shutdown = True + batch_processor._batch_processor._worker_awaken.set() + batch_processor._batch_processor._worker_thread.join() + # Reset _shutdown so force_flush is not a no-op. + batch_processor._batch_processor._shutdown = False + # Emit items after worker is stopped. + for _ in range(3): + batch_processor._batch_processor.emit(telemetry) + # Mock time.time(): first call computes deadline, second call is past it. + start = time.time() + with unittest.mock.patch( + "opentelemetry.sdk._shared_internal.time.time", + side_effect=[start, start + 1000], + ): + result = batch_processor.force_flush(timeout_millis=100) + assert result is False + batch_processor._batch_processor._shutdown = True + batch_processor._batch_processor._exporter.shutdown() + + # pylint: disable=no-self-use + def test_force_flush_returns_false_when_shutdown( + self, batch_processor_class, telemetry + ): + exporter = Mock() + batch_processor = batch_processor_class( + exporter, + max_queue_size=15, + max_export_batch_size=15, + schedule_delay_millis=30000, + export_timeout_millis=500, + ) + batch_processor.shutdown() + for _ in range(10): + batch_processor._batch_processor.emit(telemetry) + result = batch_processor.force_flush(timeout_millis=5000) + assert result is False + # Nothing should have been exported after shutdown. + exporter.export.assert_not_called() + @unittest.skipUnless( hasattr(os, "fork"), "needs *nix",