From 307014735d38620718085123eb04614b2aea448b Mon Sep 17 00:00:00 2001 From: wadii Date: Fri, 10 Apr 2026 16:48:35 +0200 Subject: [PATCH 01/13] feat/add-pipeline-analytics-processor-for-batching-events --- flagsmith/analytics.py | 152 ++++++++++++++++++++++++ tests/conftest.py | 21 +++- tests/test_pipeline_analytics.py | 194 +++++++++++++++++++++++++++++++ 3 files changed, 366 insertions(+), 1 deletion(-) create mode 100644 tests/test_pipeline_analytics.py diff --git a/flagsmith/analytics.py b/flagsmith/analytics.py index dee1ed5..8a93d3c 100644 --- a/flagsmith/analytics.py +++ b/flagsmith/analytics.py @@ -1,9 +1,17 @@ import json +import logging +import threading +import time import typing +from dataclasses import dataclass from datetime import datetime from requests_futures.sessions import FuturesSession # type: ignore +from flagsmith.version import __version__ + +logger = logging.getLogger(__name__) + ANALYTICS_ENDPOINT: typing.Final[str] = "analytics/flags/" # Used to control how often we send data(in seconds) @@ -60,3 +68,147 @@ def track_feature(self, feature_name: str) -> None: self.analytics_data[feature_name] = self.analytics_data.get(feature_name, 0) + 1 if (datetime.now() - self._last_flushed).seconds > ANALYTICS_TIMER: self.flush() + + +@dataclass +class PipelineAnalyticsConfig: + analytics_server_url: str + max_buffer: int = 1000 + flush_interval_seconds: float = 10.0 + + +class PipelineAnalyticsProcessor: + def __init__( + self, + config: PipelineAnalyticsConfig, + environment_key: str, + ) -> None: + url = config.analytics_server_url + if not url.endswith("/"): + url = f"{url}/" + self._batch_endpoint = f"{url}v1/analytics/batch" + self._environment_key = environment_key + self._max_buffer = config.max_buffer + self._flush_interval_seconds = config.flush_interval_seconds + + self._buffer: typing.List[typing.Dict[str, typing.Any]] = [] + self._dedup_keys: typing.Dict[str, str] = {} + self._lock = threading.Lock() + self._timer: typing.Optional[threading.Timer] = None + + def record_evaluation_event( + self, + flag_key: str, + enabled: bool, + value: typing.Any, + identity_identifier: typing.Optional[str] = None, + traits: typing.Optional[typing.Dict[str, typing.Any]] = None, + ) -> None: + fingerprint = f"{identity_identifier or 'none'}|{enabled}|{value}" + should_flush = False + + with self._lock: + if self._dedup_keys.get(flag_key) == fingerprint: + return + self._dedup_keys[flag_key] = fingerprint + self._buffer.append( + { + "event_id": flag_key, + "event_type": "flag_evaluation", + "evaluated_at": int(time.time() * 1000), + "identity_identifier": identity_identifier, + "enabled": enabled, + "value": value, + "traits": dict(traits) if traits else None, + "metadata": {"sdk_version": __version__}, + } + ) + if len(self._buffer) >= self._max_buffer: + should_flush = True + + if should_flush: + self.flush() + + def record_custom_event( + self, + event_name: str, + identity_identifier: typing.Optional[str] = None, + traits: typing.Optional[typing.Dict[str, typing.Any]] = None, + metadata: typing.Optional[typing.Dict[str, typing.Any]] = None, + ) -> None: + should_flush = False + + with self._lock: + self._buffer.append( + { + "event_id": event_name, + "event_type": "custom_event", + "evaluated_at": int(time.time() * 1000), + "identity_identifier": identity_identifier, + "enabled": None, + "value": None, + "traits": dict(traits) if traits else None, + "metadata": {**(metadata or {}), "sdk_version": __version__}, + } + ) + if len(self._buffer) >= self._max_buffer: + should_flush = True + + if should_flush: + self.flush() + + def flush(self) -> None: + with self._lock: + if not self._buffer: + return + events = self._buffer + self._buffer = [] + self._dedup_keys.clear() + + payload = json.dumps( + {"events": events, "environment_key": self._environment_key} + ) + future = session.post( + self._batch_endpoint, + data=payload, + timeout=3, + headers={ + "Content-Type": "application/json; charset=utf-8", + "X-Environment-Key": self._environment_key, + "Flagsmith-SDK-User-Agent": f"flagsmith-python-client/{__version__}", + }, + ) + future.add_done_callback(lambda f: self._handle_flush_result(f, events)) + + def _handle_flush_result( + self, + future: typing.Any, + events: typing.List[typing.Dict[str, typing.Any]], + ) -> None: + try: + response = future.result() + response.raise_for_status() + except Exception: + logger.warning("Failed to flush pipeline analytics, re-queuing events") + with self._lock: + self._buffer = events + self._buffer + self._buffer = self._buffer[: self._max_buffer] + + def start(self) -> None: + self._schedule_flush() + + def stop(self) -> None: + if self._timer is not None: + self._timer.cancel() + self.flush() + + def _schedule_flush(self) -> None: + self._timer = threading.Timer( + self._flush_interval_seconds, self._timer_flush + ) + self._timer.daemon = True + self._timer.start() + + def _timer_flush(self) -> None: + self.flush() + self._schedule_flush() diff --git a/tests/conftest.py b/tests/conftest.py index ce1153c..9c58c96 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -11,7 +11,11 @@ from pytest_mock import MockerFixture from flagsmith import Flagsmith -from flagsmith.analytics import AnalyticsProcessor +from flagsmith.analytics import ( + AnalyticsProcessor, + PipelineAnalyticsConfig, + PipelineAnalyticsProcessor, +) from flagsmith.api.types import EnvironmentModel from flagsmith.mappers import map_environment_document_to_context from flagsmith.types import SDKEvaluationContext @@ -26,6 +30,21 @@ def analytics_processor() -> AnalyticsProcessor: ) +@pytest.fixture() +def pipeline_analytics_config() -> PipelineAnalyticsConfig: + return PipelineAnalyticsConfig(analytics_server_url="http://test_analytics/") + + +@pytest.fixture() +def pipeline_analytics_processor( + pipeline_analytics_config: PipelineAnalyticsConfig, +) -> PipelineAnalyticsProcessor: + return PipelineAnalyticsProcessor( + config=pipeline_analytics_config, + environment_key="test_key", + ) + + @pytest.fixture(scope="session") def api_key() -> str: return "".join(random.sample(string.ascii_letters, 20)) diff --git a/tests/test_pipeline_analytics.py b/tests/test_pipeline_analytics.py new file mode 100644 index 0000000..4fd1144 --- /dev/null +++ b/tests/test_pipeline_analytics.py @@ -0,0 +1,194 @@ +import json +from concurrent.futures import Future +from unittest import mock + +import pytest + +from flagsmith.analytics import ( + PipelineAnalyticsConfig, + PipelineAnalyticsProcessor, +) + + +def test_record_evaluation_event_buffers_event( + pipeline_analytics_processor: PipelineAnalyticsProcessor, +) -> None: + pipeline_analytics_processor.record_evaluation_event( + flag_key="my_flag", + enabled=True, + value="variant_a", + identity_identifier="user123", + traits={"plan": "premium"}, + ) + + assert len(pipeline_analytics_processor._buffer) == 1 + event = pipeline_analytics_processor._buffer[0] + assert event["event_id"] == "my_flag" + assert event["event_type"] == "flag_evaluation" + assert event["identity_identifier"] == "user123" + assert event["enabled"] is True + assert event["value"] == "variant_a" + assert event["traits"] == {"plan": "premium"} + assert "sdk_version" in event["metadata"] + assert isinstance(event["evaluated_at"], int) + + +@pytest.mark.parametrize( + "second_enabled, expected_count", + [ + (True, 1), # same fingerprint -> deduplicated + (False, 2), # different fingerprint -> both kept + ], +) +def test_evaluation_event_deduplication( + pipeline_analytics_processor: PipelineAnalyticsProcessor, + second_enabled: bool, + expected_count: int, +) -> None: + pipeline_analytics_processor.record_evaluation_event( + flag_key="my_flag", enabled=True, value="v1", identity_identifier="user1" + ) + pipeline_analytics_processor.record_evaluation_event( + flag_key="my_flag", enabled=second_enabled, value="v1", identity_identifier="user1" + ) + + assert len(pipeline_analytics_processor._buffer) == expected_count + + +def test_dedup_keys_cleared_after_flush( + pipeline_analytics_processor: PipelineAnalyticsProcessor, +) -> None: + with mock.patch("flagsmith.analytics.session"): + pipeline_analytics_processor.record_evaluation_event( + flag_key="my_flag", enabled=True, value="v1", identity_identifier="user1" + ) + pipeline_analytics_processor.flush() + + pipeline_analytics_processor.record_evaluation_event( + flag_key="my_flag", enabled=True, value="v1", identity_identifier="user1" + ) + + assert len(pipeline_analytics_processor._buffer) == 1 + + +def test_auto_flush_on_buffer_full() -> None: + config = PipelineAnalyticsConfig( + analytics_server_url="http://test/", max_buffer=5 + ) + processor = PipelineAnalyticsProcessor(config=config, environment_key="key") + + with mock.patch("flagsmith.analytics.session"): + for i in range(5): + processor.record_evaluation_event( + flag_key=f"flag_{i}", enabled=True, value=str(i) + ) + + assert len(processor._buffer) == 0 + + +def test_flush_sends_correct_http_request( + pipeline_analytics_processor: PipelineAnalyticsProcessor, +) -> None: + with mock.patch("flagsmith.analytics.session") as mock_session: + pipeline_analytics_processor.record_evaluation_event( + flag_key="my_flag", enabled=True, value="v1", identity_identifier="user1" + ) + pipeline_analytics_processor.flush() + + mock_session.post.assert_called_once() + call_kwargs = mock_session.post.call_args + assert call_kwargs[0][0] == "http://test_analytics/v1/analytics/batch" + + headers = call_kwargs[1]["headers"] + assert headers["X-Environment-Key"] == "test_key" + assert headers["Content-Type"] == "application/json; charset=utf-8" + assert "flagsmith-python-client/" in headers["Flagsmith-SDK-User-Agent"] + + body = json.loads(call_kwargs[1]["data"]) + assert body["environment_key"] == "test_key" + assert len(body["events"]) == 1 + assert body["events"][0]["event_id"] == "my_flag" + + +def test_flush_noop_when_empty( + pipeline_analytics_processor: PipelineAnalyticsProcessor, +) -> None: + with mock.patch("flagsmith.analytics.session") as mock_session: + pipeline_analytics_processor.flush() + + mock_session.post.assert_not_called() + + +def test_failed_flush_requeues_events( + pipeline_analytics_processor: PipelineAnalyticsProcessor, +) -> None: + future: Future[None] = Future() + future.set_exception(Exception("connection error")) + + with mock.patch("flagsmith.analytics.session") as mock_session: + mock_session.post.return_value = future + pipeline_analytics_processor.record_evaluation_event( + flag_key="my_flag", enabled=True, value="v1" + ) + pipeline_analytics_processor.flush() + + assert len(pipeline_analytics_processor._buffer) == 1 + assert pipeline_analytics_processor._buffer[0]["event_id"] == "my_flag" + + +@pytest.mark.parametrize( + "url, expected_endpoint", + [ + ("http://example.com", "http://example.com/v1/analytics/batch"), + ("http://example.com/", "http://example.com/v1/analytics/batch"), + ], +) +def test_url_trailing_slash_handling(url: str, expected_endpoint: str) -> None: + config = PipelineAnalyticsConfig(analytics_server_url=url) + processor = PipelineAnalyticsProcessor(config=config, environment_key="key") + assert processor._batch_endpoint == expected_endpoint + + +def test_record_custom_event( + pipeline_analytics_processor: PipelineAnalyticsProcessor, +) -> None: + pipeline_analytics_processor.record_custom_event( + event_name="purchase", + identity_identifier="user1", + traits={"plan": "premium"}, + metadata={"amount": 99}, + ) + # Custom events are never deduplicated + pipeline_analytics_processor.record_custom_event( + event_name="purchase", + identity_identifier="user1", + ) + + assert len(pipeline_analytics_processor._buffer) == 2 + event = pipeline_analytics_processor._buffer[0] + assert event["event_id"] == "purchase" + assert event["event_type"] == "custom_event" + assert event["enabled"] is None + assert event["value"] is None + assert event["traits"] == {"plan": "premium"} + assert event["metadata"]["amount"] == 99 + assert "sdk_version" in event["metadata"] + + +def test_start_stop_lifecycle() -> None: + config = PipelineAnalyticsConfig( + analytics_server_url="http://test/", flush_interval_seconds=100 + ) + processor = PipelineAnalyticsProcessor(config=config, environment_key="key") + + processor.start() + assert processor._timer is not None + assert processor._timer.is_alive() + + with mock.patch("flagsmith.analytics.session"): + processor.record_evaluation_event( + flag_key="my_flag", enabled=True, value="v1" + ) + processor.stop() + + assert len(processor._buffer) == 0 From cc1c293c863477a7caef885346146b615eb6f8c1 Mon Sep 17 00:00:00 2001 From: wadii Date: Fri, 10 Apr 2026 16:54:57 +0200 Subject: [PATCH 02/13] feat/wire-analytics-pipeline-to-flag-evaluation --- flagsmith/models.py | 26 +++++++++++++++++++++++++- tests/test_models.py | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/flagsmith/models.py b/flagsmith/models.py index 72beb25..06d4c89 100644 --- a/flagsmith/models.py +++ b/flagsmith/models.py @@ -3,7 +3,7 @@ import typing from dataclasses import dataclass, field -from flagsmith.analytics import AnalyticsProcessor +from flagsmith.analytics import AnalyticsProcessor, PipelineAnalyticsProcessor from flagsmith.exceptions import FlagsmithFeatureDoesNotExistError from flagsmith.types import SDKEvaluationResult, SDKFlagResult @@ -57,6 +57,9 @@ class Flags: flags: typing.Dict[str, Flag] = field(default_factory=dict) default_flag_handler: typing.Optional[typing.Callable[[str], DefaultFlag]] = None _analytics_processor: typing.Optional[AnalyticsProcessor] = None + _pipeline_analytics_processor: typing.Optional[PipelineAnalyticsProcessor] = None + _identity_identifier: typing.Optional[str] = None + _traits: typing.Optional[typing.Dict[str, typing.Any]] = None @classmethod def from_evaluation_result( @@ -64,6 +67,9 @@ def from_evaluation_result( evaluation_result: SDKEvaluationResult, analytics_processor: typing.Optional[AnalyticsProcessor], default_flag_handler: typing.Optional[typing.Callable[[str], DefaultFlag]], + pipeline_analytics_processor: typing.Optional[PipelineAnalyticsProcessor] = None, + identity_identifier: typing.Optional[str] = None, + traits: typing.Optional[typing.Dict[str, typing.Any]] = None, ) -> Flags: return cls( flags={ @@ -73,6 +79,9 @@ def from_evaluation_result( }, default_flag_handler=default_flag_handler, _analytics_processor=analytics_processor, + _pipeline_analytics_processor=pipeline_analytics_processor, + _identity_identifier=identity_identifier, + _traits=traits, ) @classmethod @@ -81,6 +90,9 @@ def from_api_flags( api_flags: typing.Sequence[typing.Mapping[str, typing.Any]], analytics_processor: typing.Optional[AnalyticsProcessor], default_flag_handler: typing.Optional[typing.Callable[[str], DefaultFlag]], + pipeline_analytics_processor: typing.Optional[PipelineAnalyticsProcessor] = None, + identity_identifier: typing.Optional[str] = None, + traits: typing.Optional[typing.Dict[str, typing.Any]] = None, ) -> Flags: flags = { flag_data["feature"]["name"]: Flag.from_api_flag(flag_data) @@ -91,6 +103,9 @@ def from_api_flags( flags=flags, default_flag_handler=default_flag_handler, _analytics_processor=analytics_processor, + _pipeline_analytics_processor=pipeline_analytics_processor, + _identity_identifier=identity_identifier, + _traits=traits, ) def all_flags(self) -> typing.List[Flag]: @@ -141,6 +156,15 @@ def get_flag(self, feature_name: str) -> typing.Union[DefaultFlag, Flag]: if self._analytics_processor and hasattr(flag, "feature_name"): self._analytics_processor.track_feature(flag.feature_name) + if self._pipeline_analytics_processor and hasattr(flag, "feature_name"): + self._pipeline_analytics_processor.record_evaluation_event( + flag_key=flag.feature_name, + enabled=flag.enabled, + value=flag.value, + identity_identifier=self._identity_identifier, + traits=self._traits, + ) + return flag diff --git a/tests/test_models.py b/tests/test_models.py index c992395..6da01a7 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -1,7 +1,9 @@ import typing +from unittest import mock import pytest +from flagsmith.analytics import PipelineAnalyticsProcessor from flagsmith.models import Flag, Flags from flagsmith.types import SDKEvaluationResult, SDKFlagResult @@ -149,3 +151,33 @@ def test_flag_from_evaluation_result_missing_metadata__raises_expected() -> None # When & Then with pytest.raises(ValueError): Flag.from_evaluation_result(flag_result) + + +def test_get_flag_records_pipeline_evaluation_event( + pipeline_analytics_processor: PipelineAnalyticsProcessor, +) -> None: + flags = Flags( + flags={"my_feature": Flag(enabled=True, value="v1", feature_name="my_feature", feature_id=1)}, + _pipeline_analytics_processor=pipeline_analytics_processor, + _identity_identifier="user123", + _traits={"plan": "premium"}, + ) + + with mock.patch.object(pipeline_analytics_processor, "record_evaluation_event") as mock_record: + flags.get_flag("my_feature") + + mock_record.assert_called_once_with( + flag_key="my_feature", + enabled=True, + value="v1", + identity_identifier="user123", + traits={"plan": "premium"}, + ) + + +def test_get_flag_without_pipeline_processor() -> None: + flags = Flags( + flags={"my_feature": Flag(enabled=True, value="v1", feature_name="my_feature", feature_id=1)}, + ) + flag = flags.get_flag("my_feature") + assert flag.enabled is True From f6a84cfa5d8164e92ec7584e3649f87ded4b0abe Mon Sep 17 00:00:00 2001 From: wadii Date: Fri, 10 Apr 2026 17:08:29 +0200 Subject: [PATCH 03/13] feat/add-trackevent-and-pipeline-config-in-clients --- flagsmith/__init__.py | 3 +- flagsmith/flagsmith.py | 56 +++++++++++++++++++++++++++- tests/test_flagsmith.py | 82 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 139 insertions(+), 2 deletions(-) diff --git a/flagsmith/__init__.py b/flagsmith/__init__.py index 41473e4..d36feb8 100644 --- a/flagsmith/__init__.py +++ b/flagsmith/__init__.py @@ -1,5 +1,6 @@ from flagsmith import webhooks +from flagsmith.analytics import PipelineAnalyticsConfig from flagsmith.flagsmith import Flagsmith from flagsmith.version import __version__ -__all__ = ("Flagsmith", "webhooks", "__version__") +__all__ = ("Flagsmith", "PipelineAnalyticsConfig", "webhooks", "__version__") diff --git a/flagsmith/flagsmith.py b/flagsmith/flagsmith.py index 15da5ec..9461c0c 100644 --- a/flagsmith/flagsmith.py +++ b/flagsmith/flagsmith.py @@ -8,7 +8,11 @@ from requests.adapters import HTTPAdapter from urllib3 import Retry -from flagsmith.analytics import AnalyticsProcessor +from flagsmith.analytics import ( + AnalyticsProcessor, + PipelineAnalyticsConfig, + PipelineAnalyticsProcessor, +) from flagsmith.exceptions import FlagsmithAPIError, FlagsmithClientError from flagsmith.mappers import ( map_context_and_identity_data_to_context, @@ -63,6 +67,7 @@ def __init__( environment_refresh_interval_seconds: typing.Union[int, float] = 60, retries: typing.Optional[Retry] = None, enable_analytics: bool = False, + pipeline_analytics_config: typing.Optional[PipelineAnalyticsConfig] = None, default_flag_handler: typing.Optional[ typing.Callable[[str], DefaultFlag] ] = None, @@ -108,6 +113,7 @@ def __init__( self.default_flag_handler = default_flag_handler self.enable_realtime_updates = enable_realtime_updates self._analytics_processor: typing.Optional[AnalyticsProcessor] = None + self._pipeline_analytics_processor: typing.Optional[PipelineAnalyticsProcessor] = None self._evaluation_context: typing.Optional[SDKEvaluationContext] = None self._environment_updated_at: typing.Optional[datetime] = None @@ -175,6 +181,13 @@ def __init__( environment_key, self.api_url, timeout=self.request_timeout_seconds ) + if pipeline_analytics_config: + self._pipeline_analytics_processor = PipelineAnalyticsProcessor( + config=pipeline_analytics_config, + environment_key=environment_key, + ) + self._pipeline_analytics_processor.start() + def _initialise_local_evaluation(self) -> None: # To ensure that the environment is set before allowing subsequent # method calls, update the environment manually. @@ -290,6 +303,36 @@ def get_identity_segments( return map_segment_results_to_identity_segments(evaluation_result["segments"]) + def track_event( + self, + event_name: str, + identity_identifier: typing.Optional[str] = None, + traits: typing.Optional[TraitMapping] = None, + metadata: typing.Optional[typing.Dict[str, typing.Any]] = None, + ) -> None: + if not self._pipeline_analytics_processor: + raise ValueError( + "Pipeline analytics is not configured. " + "Provide pipeline_analytics_config to use track_event." + ) + self._pipeline_analytics_processor.record_custom_event( + event_name=event_name, + identity_identifier=identity_identifier, + traits=self._resolve_traits(traits), + metadata=metadata, + ) + + @staticmethod + def _resolve_traits( + traits: typing.Optional[TraitMapping], + ) -> typing.Optional[typing.Dict[str, typing.Any]]: + if not traits: + return None + return { + key: (val["value"] if isinstance(val, dict) else val) + for key, val in traits.items() + } + def update_environment(self) -> None: try: environment_data = self._get_json_response( @@ -345,6 +388,7 @@ def _get_environment_flags_from_document(self) -> Flags: evaluation_result=evaluation_result, analytics_processor=self._analytics_processor, default_flag_handler=self.default_flag_handler, + pipeline_analytics_processor=self._pipeline_analytics_processor, ) def _get_identity_flags_from_document( @@ -368,6 +412,9 @@ def _get_identity_flags_from_document( evaluation_result=evaluation_result, analytics_processor=self._analytics_processor, default_flag_handler=self.default_flag_handler, + pipeline_analytics_processor=self._pipeline_analytics_processor, + identity_identifier=identifier, + traits=self._resolve_traits(traits), ) def _get_environment_flags_from_api(self) -> Flags: @@ -379,6 +426,7 @@ def _get_environment_flags_from_api(self) -> Flags: api_flags=json_response, analytics_processor=self._analytics_processor, default_flag_handler=self.default_flag_handler, + pipeline_analytics_processor=self._pipeline_analytics_processor, ) except FlagsmithAPIError: if self.offline_handler: @@ -411,6 +459,9 @@ def _get_identity_flags_from_api( api_flags=json_response["flags"], analytics_processor=self._analytics_processor, default_flag_handler=self.default_flag_handler, + pipeline_analytics_processor=self._pipeline_analytics_processor, + identity_identifier=identifier, + traits=self._resolve_traits(traits), ) except FlagsmithAPIError: if self.offline_handler: @@ -443,3 +494,6 @@ def __del__(self) -> None: if hasattr(self, "event_stream_thread"): self.event_stream_thread.stop() + + if hasattr(self, "_pipeline_analytics_processor") and self._pipeline_analytics_processor: + self._pipeline_analytics_processor.stop() diff --git a/tests/test_flagsmith.py b/tests/test_flagsmith.py index 494e854..0cdb518 100644 --- a/tests/test_flagsmith.py +++ b/tests/test_flagsmith.py @@ -9,6 +9,7 @@ from responses import matchers from flagsmith import Flagsmith, __version__ +from flagsmith.analytics import PipelineAnalyticsConfig from flagsmith.api.types import EnvironmentModel from flagsmith.exceptions import ( FlagsmithAPIError, @@ -915,3 +916,84 @@ def test_flagsmith__init__expected_headers_sent( "Connection": "keep-alive", **expected_headers, } + + +def test_track_event_raises_without_config(api_key: str) -> None: + flagsmith = Flagsmith(environment_key=api_key) + with pytest.raises(ValueError, match="Pipeline analytics is not configured"): + flagsmith.track_event("purchase") + + +def test_track_event_calls_pipeline_processor( + mocker: MockerFixture, api_key: str +) -> None: + config = PipelineAnalyticsConfig(analytics_server_url="http://test/") + flagsmith = Flagsmith(environment_key=api_key, pipeline_analytics_config=config) + + mock_record = mocker.patch.object( + flagsmith._pipeline_analytics_processor, "record_custom_event" + ) + + flagsmith.track_event( + "purchase", + identity_identifier="user1", + traits={"plan": "premium"}, + metadata={"amount": 99}, + ) + + mock_record.assert_called_once_with( + event_name="purchase", + identity_identifier="user1", + traits={"plan": "premium"}, + metadata={"amount": 99}, + ) + + +@responses.activate() +def test_get_flag_records_evaluation_event_via_flagsmith( + mocker: MockerFixture, api_key: str, flags_json: str +) -> None: + config = PipelineAnalyticsConfig(analytics_server_url="http://test/") + flagsmith = Flagsmith(environment_key=api_key, pipeline_analytics_config=config) + + mock_record = mocker.patch.object( + flagsmith._pipeline_analytics_processor, "record_evaluation_event" + ) + + responses.add(method="GET", url=flagsmith.environment_flags_url, body=flags_json) + flags = flagsmith.get_environment_flags() + flags.get_flag("some_feature") + + mock_record.assert_called_once_with( + flag_key="some_feature", + enabled=True, + value="some-value", + identity_identifier=None, + traits=None, + ) + + +@responses.activate() +def test_get_identity_flags_passes_identity_and_traits( + mocker: MockerFixture, api_key: str, identities_json: str +) -> None: + config = PipelineAnalyticsConfig(analytics_server_url="http://test/") + flagsmith = Flagsmith(environment_key=api_key, pipeline_analytics_config=config) + + mock_record = mocker.patch.object( + flagsmith._pipeline_analytics_processor, "record_evaluation_event" + ) + + responses.add(method="POST", url=flagsmith.identities_url, body=identities_json) + flags = flagsmith.get_identity_flags( + "user123", traits={"plan": "premium"} + ) + flags.get_flag("some_feature") + + mock_record.assert_called_once_with( + flag_key="some_feature", + enabled=True, + value="some-value", + identity_identifier="user123", + traits={"plan": "premium"}, + ) From f8cdca5996eb194f16d6a70b80adeea34473e35a Mon Sep 17 00:00:00 2001 From: wadii Date: Mon, 13 Apr 2026 11:50:38 +0200 Subject: [PATCH 04/13] feat: log-runtime-error --- flagsmith/analytics.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/flagsmith/analytics.py b/flagsmith/analytics.py index 8a93d3c..9557a20 100644 --- a/flagsmith/analytics.py +++ b/flagsmith/analytics.py @@ -168,16 +168,20 @@ def flush(self) -> None: payload = json.dumps( {"events": events, "environment_key": self._environment_key} ) - future = session.post( - self._batch_endpoint, - data=payload, - timeout=3, - headers={ - "Content-Type": "application/json; charset=utf-8", - "X-Environment-Key": self._environment_key, - "Flagsmith-SDK-User-Agent": f"flagsmith-python-client/{__version__}", - }, - ) + try: + future = session.post( + self._batch_endpoint, + data=payload, + timeout=3, + headers={ + "Content-Type": "application/json; charset=utf-8", + "X-Environment-Key": self._environment_key, + "Flagsmith-SDK-User-Agent": f"flagsmith-python-client/{__version__}", + }, + ) + except RuntimeError: + logger.debug("Skipping flush: thread pool already shut down") + return future.add_done_callback(lambda f: self._handle_flush_result(f, events)) def _handle_flush_result( From f00647c743db19e9701a86bc433a144e572eaacb Mon Sep 17 00:00:00 2001 From: wadii Date: Mon, 13 Apr 2026 17:21:39 +0200 Subject: [PATCH 05/13] feat: extracted-resolve-trait-values --- flagsmith/analytics.py | 2 +- flagsmith/flagsmith.py | 18 ++++-------------- flagsmith/mappers.py | 31 ++++++++++++++----------------- tests/test_flagsmith.py | 27 +++++++++++++++++++++++++++ 4 files changed, 46 insertions(+), 32 deletions(-) diff --git a/flagsmith/analytics.py b/flagsmith/analytics.py index 9557a20..6b6b940 100644 --- a/flagsmith/analytics.py +++ b/flagsmith/analytics.py @@ -193,7 +193,7 @@ def _handle_flush_result( response = future.result() response.raise_for_status() except Exception: - logger.warning("Failed to flush pipeline analytics, re-queuing events") + logger.warning("Failed to flush pipeline analytics, re-queuing events", exc_info=True) with self._lock: self._buffer = events + self._buffer self._buffer = self._buffer[: self._max_buffer] diff --git a/flagsmith/flagsmith.py b/flagsmith/flagsmith.py index 9461c0c..fdd4b02 100644 --- a/flagsmith/flagsmith.py +++ b/flagsmith/flagsmith.py @@ -19,6 +19,7 @@ map_environment_document_to_context, map_environment_document_to_environment_updated_at, map_segment_results_to_identity_segments, + resolve_trait_values, ) from flagsmith.models import DefaultFlag, Flags, Segment from flagsmith.offline_handlers import OfflineHandler @@ -318,21 +319,10 @@ def track_event( self._pipeline_analytics_processor.record_custom_event( event_name=event_name, identity_identifier=identity_identifier, - traits=self._resolve_traits(traits), + traits=resolve_trait_values(traits), metadata=metadata, ) - @staticmethod - def _resolve_traits( - traits: typing.Optional[TraitMapping], - ) -> typing.Optional[typing.Dict[str, typing.Any]]: - if not traits: - return None - return { - key: (val["value"] if isinstance(val, dict) else val) - for key, val in traits.items() - } - def update_environment(self) -> None: try: environment_data = self._get_json_response( @@ -414,7 +404,7 @@ def _get_identity_flags_from_document( default_flag_handler=self.default_flag_handler, pipeline_analytics_processor=self._pipeline_analytics_processor, identity_identifier=identifier, - traits=self._resolve_traits(traits), + traits=resolve_trait_values(traits), ) def _get_environment_flags_from_api(self) -> Flags: @@ -461,7 +451,7 @@ def _get_identity_flags_from_api( default_flag_handler=self.default_flag_handler, pipeline_analytics_processor=self._pipeline_analytics_processor, identity_identifier=identifier, - traits=self._resolve_traits(traits), + traits=resolve_trait_values(traits), ) except FlagsmithAPIError: if self.offline_handler: diff --git a/flagsmith/mappers.py b/flagsmith/mappers.py index 18d1acf..7299a1b 100644 --- a/flagsmith/mappers.py +++ b/flagsmith/mappers.py @@ -27,6 +27,7 @@ SegmentMetadata, StreamEvent, TraitConfig, + TraitMapping, ) from flagsmith.utils.datetime import fromisoformat @@ -75,31 +76,27 @@ def map_environment_document_to_environment_updated_at( return updated_at.astimezone(tz=timezone.utc) +def resolve_trait_values( + traits: typing.Optional[TraitMapping], +) -> typing.Optional[typing.Dict[str, typing.Any]]: + if not traits: + return None + return { + key: (val["value"] if isinstance(val, dict) else val) + for key, val in traits.items() + } + + def map_context_and_identity_data_to_context( context: SDKEvaluationContext, identifier: str, - traits: typing.Optional[ - typing.Mapping[ - str, - typing.Union[ - ContextValue, - TraitConfig, - ], - ] - ], + traits: typing.Optional[TraitMapping] = None, ) -> SDKEvaluationContext: return { **context, "identity": { "identifier": identifier, - "traits": { - trait_key: ( - trait_value_or_config["value"] - if isinstance(trait_value_or_config, dict) - else trait_value_or_config - ) - for trait_key, trait_value_or_config in (traits or {}).items() - }, + "traits": resolve_trait_values(traits) or {}, }, } diff --git a/tests/test_flagsmith.py b/tests/test_flagsmith.py index 0cdb518..96fc130 100644 --- a/tests/test_flagsmith.py +++ b/tests/test_flagsmith.py @@ -997,3 +997,30 @@ def test_get_identity_flags_passes_identity_and_traits( identity_identifier="user123", traits={"plan": "premium"}, ) + + +@responses.activate() +def test_get_identity_flags_resolves_trait_config_values( + mocker: MockerFixture, api_key: str, identities_json: str +) -> None: + config = PipelineAnalyticsConfig(analytics_server_url="http://test/") + flagsmith = Flagsmith(environment_key=api_key, pipeline_analytics_config=config) + + mock_record = mocker.patch.object( + flagsmith._pipeline_analytics_processor, "record_evaluation_event" + ) + + responses.add(method="POST", url=flagsmith.identities_url, body=identities_json) + flags = flagsmith.get_identity_flags( + "user123", + traits={"plan": {"value": "premium", "transient": True}}, + ) + flags.get_flag("some_feature") + + mock_record.assert_called_once_with( + flag_key="some_feature", + enabled=True, + value="some-value", + identity_identifier="user123", + traits={"plan": "premium"}, + ) From cd5ab916d0ea9cefff5b08bd54f9af59f6e25f34 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 13 Apr 2026 15:30:39 +0000 Subject: [PATCH 06/13] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- flagsmith/analytics.py | 8 ++++---- flagsmith/flagsmith.py | 9 +++++++-- flagsmith/models.py | 8 ++++++-- tests/test_flagsmith.py | 4 +--- tests/test_models.py | 16 +++++++++++++--- tests/test_pipeline_analytics.py | 15 +++++++-------- 6 files changed, 38 insertions(+), 22 deletions(-) diff --git a/flagsmith/analytics.py b/flagsmith/analytics.py index 6b6b940..90418ec 100644 --- a/flagsmith/analytics.py +++ b/flagsmith/analytics.py @@ -193,7 +193,9 @@ def _handle_flush_result( response = future.result() response.raise_for_status() except Exception: - logger.warning("Failed to flush pipeline analytics, re-queuing events", exc_info=True) + logger.warning( + "Failed to flush pipeline analytics, re-queuing events", exc_info=True + ) with self._lock: self._buffer = events + self._buffer self._buffer = self._buffer[: self._max_buffer] @@ -207,9 +209,7 @@ def stop(self) -> None: self.flush() def _schedule_flush(self) -> None: - self._timer = threading.Timer( - self._flush_interval_seconds, self._timer_flush - ) + self._timer = threading.Timer(self._flush_interval_seconds, self._timer_flush) self._timer.daemon = True self._timer.start() diff --git a/flagsmith/flagsmith.py b/flagsmith/flagsmith.py index fdd4b02..acc24e9 100644 --- a/flagsmith/flagsmith.py +++ b/flagsmith/flagsmith.py @@ -114,7 +114,9 @@ def __init__( self.default_flag_handler = default_flag_handler self.enable_realtime_updates = enable_realtime_updates self._analytics_processor: typing.Optional[AnalyticsProcessor] = None - self._pipeline_analytics_processor: typing.Optional[PipelineAnalyticsProcessor] = None + self._pipeline_analytics_processor: typing.Optional[ + PipelineAnalyticsProcessor + ] = None self._evaluation_context: typing.Optional[SDKEvaluationContext] = None self._environment_updated_at: typing.Optional[datetime] = None @@ -485,5 +487,8 @@ def __del__(self) -> None: if hasattr(self, "event_stream_thread"): self.event_stream_thread.stop() - if hasattr(self, "_pipeline_analytics_processor") and self._pipeline_analytics_processor: + if ( + hasattr(self, "_pipeline_analytics_processor") + and self._pipeline_analytics_processor + ): self._pipeline_analytics_processor.stop() diff --git a/flagsmith/models.py b/flagsmith/models.py index 06d4c89..8d3765c 100644 --- a/flagsmith/models.py +++ b/flagsmith/models.py @@ -67,7 +67,9 @@ def from_evaluation_result( evaluation_result: SDKEvaluationResult, analytics_processor: typing.Optional[AnalyticsProcessor], default_flag_handler: typing.Optional[typing.Callable[[str], DefaultFlag]], - pipeline_analytics_processor: typing.Optional[PipelineAnalyticsProcessor] = None, + pipeline_analytics_processor: typing.Optional[ + PipelineAnalyticsProcessor + ] = None, identity_identifier: typing.Optional[str] = None, traits: typing.Optional[typing.Dict[str, typing.Any]] = None, ) -> Flags: @@ -90,7 +92,9 @@ def from_api_flags( api_flags: typing.Sequence[typing.Mapping[str, typing.Any]], analytics_processor: typing.Optional[AnalyticsProcessor], default_flag_handler: typing.Optional[typing.Callable[[str], DefaultFlag]], - pipeline_analytics_processor: typing.Optional[PipelineAnalyticsProcessor] = None, + pipeline_analytics_processor: typing.Optional[ + PipelineAnalyticsProcessor + ] = None, identity_identifier: typing.Optional[str] = None, traits: typing.Optional[typing.Dict[str, typing.Any]] = None, ) -> Flags: diff --git a/tests/test_flagsmith.py b/tests/test_flagsmith.py index 96fc130..fd73e87 100644 --- a/tests/test_flagsmith.py +++ b/tests/test_flagsmith.py @@ -985,9 +985,7 @@ def test_get_identity_flags_passes_identity_and_traits( ) responses.add(method="POST", url=flagsmith.identities_url, body=identities_json) - flags = flagsmith.get_identity_flags( - "user123", traits={"plan": "premium"} - ) + flags = flagsmith.get_identity_flags("user123", traits={"plan": "premium"}) flags.get_flag("some_feature") mock_record.assert_called_once_with( diff --git a/tests/test_models.py b/tests/test_models.py index 6da01a7..19355a9 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -157,13 +157,19 @@ def test_get_flag_records_pipeline_evaluation_event( pipeline_analytics_processor: PipelineAnalyticsProcessor, ) -> None: flags = Flags( - flags={"my_feature": Flag(enabled=True, value="v1", feature_name="my_feature", feature_id=1)}, + flags={ + "my_feature": Flag( + enabled=True, value="v1", feature_name="my_feature", feature_id=1 + ) + }, _pipeline_analytics_processor=pipeline_analytics_processor, _identity_identifier="user123", _traits={"plan": "premium"}, ) - with mock.patch.object(pipeline_analytics_processor, "record_evaluation_event") as mock_record: + with mock.patch.object( + pipeline_analytics_processor, "record_evaluation_event" + ) as mock_record: flags.get_flag("my_feature") mock_record.assert_called_once_with( @@ -177,7 +183,11 @@ def test_get_flag_records_pipeline_evaluation_event( def test_get_flag_without_pipeline_processor() -> None: flags = Flags( - flags={"my_feature": Flag(enabled=True, value="v1", feature_name="my_feature", feature_id=1)}, + flags={ + "my_feature": Flag( + enabled=True, value="v1", feature_name="my_feature", feature_id=1 + ) + }, ) flag = flags.get_flag("my_feature") assert flag.enabled is True diff --git a/tests/test_pipeline_analytics.py b/tests/test_pipeline_analytics.py index 4fd1144..1ba47f7 100644 --- a/tests/test_pipeline_analytics.py +++ b/tests/test_pipeline_analytics.py @@ -36,7 +36,7 @@ def test_record_evaluation_event_buffers_event( @pytest.mark.parametrize( "second_enabled, expected_count", [ - (True, 1), # same fingerprint -> deduplicated + (True, 1), # same fingerprint -> deduplicated (False, 2), # different fingerprint -> both kept ], ) @@ -49,7 +49,10 @@ def test_evaluation_event_deduplication( flag_key="my_flag", enabled=True, value="v1", identity_identifier="user1" ) pipeline_analytics_processor.record_evaluation_event( - flag_key="my_flag", enabled=second_enabled, value="v1", identity_identifier="user1" + flag_key="my_flag", + enabled=second_enabled, + value="v1", + identity_identifier="user1", ) assert len(pipeline_analytics_processor._buffer) == expected_count @@ -72,9 +75,7 @@ def test_dedup_keys_cleared_after_flush( def test_auto_flush_on_buffer_full() -> None: - config = PipelineAnalyticsConfig( - analytics_server_url="http://test/", max_buffer=5 - ) + config = PipelineAnalyticsConfig(analytics_server_url="http://test/", max_buffer=5) processor = PipelineAnalyticsProcessor(config=config, environment_key="key") with mock.patch("flagsmith.analytics.session"): @@ -186,9 +187,7 @@ def test_start_stop_lifecycle() -> None: assert processor._timer.is_alive() with mock.patch("flagsmith.analytics.session"): - processor.record_evaluation_event( - flag_key="my_flag", enabled=True, value="v1" - ) + processor.record_evaluation_event(flag_key="my_flag", enabled=True, value="v1") processor.stop() assert len(processor._buffer) == 0 From 9add012a6e9a35273d2e0508707f7d27eab751a2 Mon Sep 17 00:00:00 2001 From: wadii Date: Mon, 13 Apr 2026 18:55:42 +0200 Subject: [PATCH 07/13] feat: linter --- flagsmith/flagsmith.py | 31 +++++++++++++++++++++---------- flagsmith/mappers.py | 2 -- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/flagsmith/flagsmith.py b/flagsmith/flagsmith.py index fdd4b02..7dee237 100644 --- a/flagsmith/flagsmith.py +++ b/flagsmith/flagsmith.py @@ -177,17 +177,28 @@ def __init__( self._initialise_local_evaluation() - if enable_analytics: - self._analytics_processor = AnalyticsProcessor( - environment_key, self.api_url, timeout=self.request_timeout_seconds - ) + self._initialise_analytics( + environment_key=environment_key, + enable_analytics=enable_analytics, + pipeline_analytics_config=pipeline_analytics_config, + ) - if pipeline_analytics_config: - self._pipeline_analytics_processor = PipelineAnalyticsProcessor( - config=pipeline_analytics_config, - environment_key=environment_key, - ) - self._pipeline_analytics_processor.start() + def _initialise_analytics( + self, + environment_key: str, + enable_analytics: bool, + pipeline_analytics_config: typing.Optional[PipelineAnalyticsConfig], + ) -> None: + if enable_analytics: + self._analytics_processor = AnalyticsProcessor( + environment_key, self.api_url, timeout=self.request_timeout_seconds + ) + if pipeline_analytics_config: + self._pipeline_analytics_processor = PipelineAnalyticsProcessor( + config=pipeline_analytics_config, + environment_key=environment_key, + ) + self._pipeline_analytics_processor.start() def _initialise_local_evaluation(self) -> None: # To ensure that the environment is set before allowing subsequent diff --git a/flagsmith/mappers.py b/flagsmith/mappers.py index 7299a1b..3ed3e3d 100644 --- a/flagsmith/mappers.py +++ b/flagsmith/mappers.py @@ -12,7 +12,6 @@ StrValueSegmentCondition, ) from flag_engine.result.types import SegmentResult -from flag_engine.segments.types import ContextValue from flagsmith.api.types import ( EnvironmentModel, @@ -26,7 +25,6 @@ SDKEvaluationContext, SegmentMetadata, StreamEvent, - TraitConfig, TraitMapping, ) from flagsmith.utils.datetime import fromisoformat From b31dac4ea3c6dabbd4a5ef1d69aa0a5a0cccb139 Mon Sep 17 00:00:00 2001 From: wadii Date: Tue, 14 Apr 2026 11:10:51 +0200 Subject: [PATCH 08/13] feat: trimmed-tests --- tests/test_flagsmith.py | 65 ++++++++++++-------------------- tests/test_models.py | 30 --------------- tests/test_pipeline_analytics.py | 13 ------- 3 files changed, 24 insertions(+), 84 deletions(-) diff --git a/tests/test_flagsmith.py b/tests/test_flagsmith.py index fd73e87..b0ad097 100644 --- a/tests/test_flagsmith.py +++ b/tests/test_flagsmith.py @@ -924,47 +924,25 @@ def test_track_event_raises_without_config(api_key: str) -> None: flagsmith.track_event("purchase") -def test_track_event_calls_pipeline_processor( - mocker: MockerFixture, api_key: str -) -> None: - config = PipelineAnalyticsConfig(analytics_server_url="http://test/") - flagsmith = Flagsmith(environment_key=api_key, pipeline_analytics_config=config) - - mock_record = mocker.patch.object( - flagsmith._pipeline_analytics_processor, "record_custom_event" - ) - - flagsmith.track_event( - "purchase", - identity_identifier="user1", - traits={"plan": "premium"}, - metadata={"amount": 99}, - ) - - mock_record.assert_called_once_with( - event_name="purchase", - identity_identifier="user1", - traits={"plan": "premium"}, - metadata={"amount": 99}, - ) - - @responses.activate() -def test_get_flag_records_evaluation_event_via_flagsmith( +def test_pipeline_analytics_records_events( mocker: MockerFixture, api_key: str, flags_json: str ) -> None: config = PipelineAnalyticsConfig(analytics_server_url="http://test/") flagsmith = Flagsmith(environment_key=api_key, pipeline_analytics_config=config) - mock_record = mocker.patch.object( + mock_eval = mocker.patch.object( flagsmith._pipeline_analytics_processor, "record_evaluation_event" ) + mock_custom = mocker.patch.object( + flagsmith._pipeline_analytics_processor, "record_custom_event" + ) responses.add(method="GET", url=flagsmith.environment_flags_url, body=flags_json) flags = flagsmith.get_environment_flags() flags.get_flag("some_feature") - mock_record.assert_called_once_with( + mock_eval.assert_called_once_with( flag_key="some_feature", enabled=True, value="some-value", @@ -972,9 +950,23 @@ def test_get_flag_records_evaluation_event_via_flagsmith( traits=None, ) + flagsmith.track_event( + "purchase", + identity_identifier="user1", + traits={"plan": "premium"}, + metadata={"amount": 99}, + ) + + mock_custom.assert_called_once_with( + event_name="purchase", + identity_identifier="user1", + traits={"plan": "premium"}, + metadata={"amount": 99}, + ) + @responses.activate() -def test_get_identity_flags_passes_identity_and_traits( +def test_identity_flags_records_evaluation_with_resolved_traits( mocker: MockerFixture, api_key: str, identities_json: str ) -> None: config = PipelineAnalyticsConfig(analytics_server_url="http://test/") @@ -985,6 +977,8 @@ def test_get_identity_flags_passes_identity_and_traits( ) responses.add(method="POST", url=flagsmith.identities_url, body=identities_json) + responses.add(method="POST", url=flagsmith.identities_url, body=identities_json) + flags = flagsmith.get_identity_flags("user123", traits={"plan": "premium"}) flags.get_flag("some_feature") @@ -996,19 +990,8 @@ def test_get_identity_flags_passes_identity_and_traits( traits={"plan": "premium"}, ) + mock_record.reset_mock() -@responses.activate() -def test_get_identity_flags_resolves_trait_config_values( - mocker: MockerFixture, api_key: str, identities_json: str -) -> None: - config = PipelineAnalyticsConfig(analytics_server_url="http://test/") - flagsmith = Flagsmith(environment_key=api_key, pipeline_analytics_config=config) - - mock_record = mocker.patch.object( - flagsmith._pipeline_analytics_processor, "record_evaluation_event" - ) - - responses.add(method="POST", url=flagsmith.identities_url, body=identities_json) flags = flagsmith.get_identity_flags( "user123", traits={"plan": {"value": "premium", "transient": True}}, diff --git a/tests/test_models.py b/tests/test_models.py index 19355a9..8c42093 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -1,9 +1,7 @@ import typing -from unittest import mock import pytest -from flagsmith.analytics import PipelineAnalyticsProcessor from flagsmith.models import Flag, Flags from flagsmith.types import SDKEvaluationResult, SDKFlagResult @@ -153,34 +151,6 @@ def test_flag_from_evaluation_result_missing_metadata__raises_expected() -> None Flag.from_evaluation_result(flag_result) -def test_get_flag_records_pipeline_evaluation_event( - pipeline_analytics_processor: PipelineAnalyticsProcessor, -) -> None: - flags = Flags( - flags={ - "my_feature": Flag( - enabled=True, value="v1", feature_name="my_feature", feature_id=1 - ) - }, - _pipeline_analytics_processor=pipeline_analytics_processor, - _identity_identifier="user123", - _traits={"plan": "premium"}, - ) - - with mock.patch.object( - pipeline_analytics_processor, "record_evaluation_event" - ) as mock_record: - flags.get_flag("my_feature") - - mock_record.assert_called_once_with( - flag_key="my_feature", - enabled=True, - value="v1", - identity_identifier="user123", - traits={"plan": "premium"}, - ) - - def test_get_flag_without_pipeline_processor() -> None: flags = Flags( flags={ diff --git a/tests/test_pipeline_analytics.py b/tests/test_pipeline_analytics.py index 1ba47f7..ebeda83 100644 --- a/tests/test_pipeline_analytics.py +++ b/tests/test_pipeline_analytics.py @@ -137,19 +137,6 @@ def test_failed_flush_requeues_events( assert pipeline_analytics_processor._buffer[0]["event_id"] == "my_flag" -@pytest.mark.parametrize( - "url, expected_endpoint", - [ - ("http://example.com", "http://example.com/v1/analytics/batch"), - ("http://example.com/", "http://example.com/v1/analytics/batch"), - ], -) -def test_url_trailing_slash_handling(url: str, expected_endpoint: str) -> None: - config = PipelineAnalyticsConfig(analytics_server_url=url) - processor = PipelineAnalyticsProcessor(config=config, environment_key="key") - assert processor._batch_endpoint == expected_endpoint - - def test_record_custom_event( pipeline_analytics_processor: PipelineAnalyticsProcessor, ) -> None: From aacf9f00c63b56b96a7b9d66fb7ae71eb936970a Mon Sep 17 00:00:00 2001 From: wadii Date: Wed, 15 Apr 2026 15:49:38 +0200 Subject: [PATCH 09/13] feat: renamed-max-buffer-to-max-buffer-items --- flagsmith/analytics.py | 4 ++-- tests/test_pipeline_analytics.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/flagsmith/analytics.py b/flagsmith/analytics.py index 90418ec..c3a3ffe 100644 --- a/flagsmith/analytics.py +++ b/flagsmith/analytics.py @@ -73,7 +73,7 @@ def track_feature(self, feature_name: str) -> None: @dataclass class PipelineAnalyticsConfig: analytics_server_url: str - max_buffer: int = 1000 + max_buffer_items: int = 1000 flush_interval_seconds: float = 10.0 @@ -88,7 +88,7 @@ def __init__( url = f"{url}/" self._batch_endpoint = f"{url}v1/analytics/batch" self._environment_key = environment_key - self._max_buffer = config.max_buffer + self._max_buffer = config.max_buffer_items self._flush_interval_seconds = config.flush_interval_seconds self._buffer: typing.List[typing.Dict[str, typing.Any]] = [] diff --git a/tests/test_pipeline_analytics.py b/tests/test_pipeline_analytics.py index ebeda83..078e026 100644 --- a/tests/test_pipeline_analytics.py +++ b/tests/test_pipeline_analytics.py @@ -75,7 +75,7 @@ def test_dedup_keys_cleared_after_flush( def test_auto_flush_on_buffer_full() -> None: - config = PipelineAnalyticsConfig(analytics_server_url="http://test/", max_buffer=5) + config = PipelineAnalyticsConfig(analytics_server_url="http://test/", max_buffer_items=5) processor = PipelineAnalyticsProcessor(config=config, environment_key="key") with mock.patch("flagsmith.analytics.session"): From b91349f47fd0d42e10e3680df0d090a2c262ec46 Mon Sep 17 00:00:00 2001 From: wadii Date: Wed, 15 Apr 2026 15:51:41 +0200 Subject: [PATCH 10/13] feat: remove-redundant-has-attr-check-in-del --- flagsmith/flagsmith.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/flagsmith/flagsmith.py b/flagsmith/flagsmith.py index cdc716b..0ff75fe 100644 --- a/flagsmith/flagsmith.py +++ b/flagsmith/flagsmith.py @@ -498,8 +498,5 @@ def __del__(self) -> None: if hasattr(self, "event_stream_thread"): self.event_stream_thread.stop() - if ( - hasattr(self, "_pipeline_analytics_processor") - and self._pipeline_analytics_processor - ): + if self._pipeline_analytics_processor: self._pipeline_analytics_processor.stop() From 55df6ebcfbb7f2e4a035b0394c772317dd763ffe Mon Sep 17 00:00:00 2001 From: wadii Date: Wed, 15 Apr 2026 15:56:26 +0200 Subject: [PATCH 11/13] feat: added-docstring-for-PipelineAnalyticsProcessor --- flagsmith/analytics.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/flagsmith/analytics.py b/flagsmith/analytics.py index c3a3ffe..b7e4d4b 100644 --- a/flagsmith/analytics.py +++ b/flagsmith/analytics.py @@ -78,6 +78,14 @@ class PipelineAnalyticsConfig: class PipelineAnalyticsProcessor: + """ + Buffered analytics processor that sends per-evaluation and custom events + to the Flagsmith pipeline analytics endpoint in batches. + + Evaluation events are deduplicated within each flush window. Events are + flushed periodically via a background timer or when the buffer is full. + """ + def __init__( self, config: PipelineAnalyticsConfig, From 7743a37a3beabc0d7853b215f7975318d7ebfda0 Mon Sep 17 00:00:00 2001 From: wadii Date: Wed, 15 Apr 2026 15:58:09 +0200 Subject: [PATCH 12/13] feat: use-at-exit-to-flush-analytics-on-process-exit --- flagsmith/analytics.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/flagsmith/analytics.py b/flagsmith/analytics.py index b7e4d4b..f0d0bab 100644 --- a/flagsmith/analytics.py +++ b/flagsmith/analytics.py @@ -1,3 +1,4 @@ +import atexit import json import logging import threading @@ -210,8 +211,10 @@ def _handle_flush_result( def start(self) -> None: self._schedule_flush() + atexit.register(self.stop) def stop(self) -> None: + atexit.unregister(self.stop) if self._timer is not None: self._timer.cancel() self.flush() From b38039f2a97a24e264c8c6127394d8191a61a1d0 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 15 Apr 2026 14:23:56 +0000 Subject: [PATCH 13/13] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/test_pipeline_analytics.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/test_pipeline_analytics.py b/tests/test_pipeline_analytics.py index 078e026..2a45899 100644 --- a/tests/test_pipeline_analytics.py +++ b/tests/test_pipeline_analytics.py @@ -75,7 +75,9 @@ def test_dedup_keys_cleared_after_flush( def test_auto_flush_on_buffer_full() -> None: - config = PipelineAnalyticsConfig(analytics_server_url="http://test/", max_buffer_items=5) + config = PipelineAnalyticsConfig( + analytics_server_url="http://test/", max_buffer_items=5 + ) processor = PipelineAnalyticsProcessor(config=config, environment_key="key") with mock.patch("flagsmith.analytics.session"):