From e15a9ccfa2879c12cd3c43d0140ac9bfdc3b4425 Mon Sep 17 00:00:00 2001 From: Anubhav Tandon Date: Fri, 5 Jun 2026 05:22:03 +0530 Subject: [PATCH 1/6] Add soft-delete support to ClickHouse identity tracking --- .../0002_identities_add_is_deleted.py | 17 ++++++ api/edge_api/identities/models.py | 10 ++++ api/segment_membership/mappers.py | 9 ++- api/segment_membership/services.py | 4 +- api/segment_membership/tasks.py | 38 +++++++++++++ .../test_unit_segment_membership_mappers.py | 24 ++++++-- .../test_unit_segment_membership_tasks.py | 57 ++++++++++++++++++- 7 files changed, 149 insertions(+), 10 deletions(-) create mode 100644 api/clickhouse/migrations/0002_identities_add_is_deleted.py diff --git a/api/clickhouse/migrations/0002_identities_add_is_deleted.py b/api/clickhouse/migrations/0002_identities_add_is_deleted.py new file mode 100644 index 000000000000..fdb16e4b887b --- /dev/null +++ b/api/clickhouse/migrations/0002_identities_add_is_deleted.py @@ -0,0 +1,17 @@ +from django.db import migrations + +_ADD_COLUMN_DDL = ( + "ALTER TABLE IDENTITIES ADD COLUMN IF NOT EXISTS is_deleted Bool DEFAULT false" +) + + +class Migration(migrations.Migration): + # ClickHouse has no transactional DDL. + atomic = False + dependencies = [("clickhouse", "0001_create_identities")] + operations = [ + migrations.RunSQL( + _ADD_COLUMN_DDL, + reverse_sql=("ALTER TABLE IDENTITIES DROP COLUMN IF EXISTS is_deleted"), + ) + ] diff --git a/api/edge_api/identities/models.py b/api/edge_api/identities/models.py index 718bf78bde55..21a8dccb0d75 100644 --- a/api/edge_api/identities/models.py +++ b/api/edge_api/identities/models.py @@ -2,6 +2,7 @@ import typing from contextlib import suppress +from django.conf import settings from django.db.models import Prefetch, Q from api_keys.user import APIKeyUser @@ -17,6 +18,7 @@ from features.models import FeatureState from features.multivariate.models import MultivariateFeatureStateValue from features.versioning.versioning_service import get_environment_flags_dict +from segment_membership.tasks import write_identity_deletion_tombstone_to_clickhouse from users.models import FFAdminUser from util.engine_models.features.models import FeatureStateModel from util.engine_models.identities.models import IdentityFeaturesList, IdentityModel @@ -194,6 +196,14 @@ def delete(self, user: FFAdminUser | APIKeyUser = None) -> None: # type: ignore user=user, ) self._reset_initial_state() # type: ignore[no-untyped-call] + if settings.CLICKHOUSE_ENABLED: + write_identity_deletion_tombstone_to_clickhouse.delay( + args=( + self.engine_identity_model.environment_api_key, + self.engine_identity_model.identifier, + self.engine_identity_model.composite_key, + ) + ) def synchronise_features(self, valid_feature_names: typing.Collection[str]) -> None: identity_feature_names = { diff --git a/api/segment_membership/mappers.py b/api/segment_membership/mappers.py index 9ee572db4841..ba3675914b4d 100644 --- a/api/segment_membership/mappers.py +++ b/api/segment_membership/mappers.py @@ -2,16 +2,18 @@ from flagsmith_schemas import dynamodb -# (environment_id, identifier, identity_key, traits) -ClickHouseIdentityRow = tuple[str, str, str, dict[str, object] | None] +# (environment_id, identifier, identity_key, traits, is_deleted) +ClickHouseIdentityRow = tuple[str, str, str, dict[str, object] | None, bool] def map_identity_document_to_clickhouse_row( env_key: str, identity_doc: dynamodb.Identity, + *, + is_deleted: bool = False, ) -> ClickHouseIdentityRow: """Project a Dynamo identity document onto an IDENTITIES row tuple - `(environment_id, identifier, identity_key, traits)`.""" + `(environment_id, identifier, identity_key, traits, is_deleted)`.""" identifier = identity_doc["identifier"] composite_key = identity_doc["composite_key"] raw_traits = identity_doc.get("identity_traits") @@ -21,6 +23,7 @@ def map_identity_document_to_clickhouse_row( identifier, composite_key, traits, + is_deleted, ) diff --git a/api/segment_membership/services.py b/api/segment_membership/services.py index 9be17c1baee9..59d765e52b76 100644 --- a/api/segment_membership/services.py +++ b/api/segment_membership/services.py @@ -104,7 +104,9 @@ def compute_segment_counts_for_project( f"SELECT {seg.id} AS segment_id, " f"i.environment_id AS env_key, count() AS c " f"FROM IDENTITIES AS i FINAL " - f"WHERE i.environment_id IN %(env_keys)s AND ({predicate}) " + f"WHERE i.environment_id IN %(env_keys)s " + f"AND i.is_deleted = false " + f"AND ({predicate}) " f"GROUP BY i.environment_id" ) diff --git a/api/segment_membership/tasks.py b/api/segment_membership/tasks.py index 704ef7782ab6..3180ba085ab2 100644 --- a/api/segment_membership/tasks.py +++ b/api/segment_membership/tasks.py @@ -45,6 +45,7 @@ "identifier", "identity_key", "traits", + "is_deleted", ) _INSERT_IDENTITIES_SQL = ( @@ -188,3 +189,40 @@ def refresh_project_segment_counts(project_id: int) -> None: membership_counts__count=len(membership_counts), stale_counts__count=stale_deleted, ) + + +@register_task_handler() +def write_identity_deletion_tombstone_to_clickhouse( + env_key: str, + identifier: str, + identity_key: str, +) -> None: + """Insert a tombstone row for a deleted identity so it is excluded from + segment membership counts at the next refresh. + + ReplacingMergeTree(inserted_at) keeps the row with the highest + inserted_at per (environment_id, identifier). Because this row is + written after the identity is removed from Dynamo its inserted_at + will be newer than any prior live row, so FINAL deduplication will + always surface the tombstone. + """ + if not settings.CLICKHOUSE_ENABLED: + logger.info( + "tombstone.skipped", + reason="clickhouse_not_configured", + env_key=env_key, + identifier=identifier, + ) + return + + log_comment = f"flagsmith:segment_membership:tombstone:env_{env_key}" + with open_clickhouse_cursor(log_comment=log_comment) as cursor: + cursor.executemany( + _INSERT_IDENTITIES_SQL, + [(env_key, identifier, identity_key, None, True)], # type: ignore[arg-type] + ) + logger.info( + "tombstone.written", + env_key=env_key, + identifier=identifier, + ) diff --git a/api/tests/unit/segment_membership/test_unit_segment_membership_mappers.py b/api/tests/unit/segment_membership/test_unit_segment_membership_mappers.py index bd3cf464a27e..b42e256a2801 100644 --- a/api/tests/unit/segment_membership/test_unit_segment_membership_mappers.py +++ b/api/tests/unit/segment_membership/test_unit_segment_membership_mappers.py @@ -22,7 +22,7 @@ {"trait_key": "plan", "trait_value": "growth"}, ], }, - ("env-key", "alice", "env_x_alice", {"plan": "growth"}), + ("env-key", "alice", "env_x_alice", {"plan": "growth"}, False), id="single string trait", ), pytest.param( @@ -34,7 +34,7 @@ "created_date": "2026-05-08T00:00:00Z", "identity_traits": [], }, - ("env-key", "alice", "env_x_alice", None), + ("env-key", "alice", "env_x_alice", None, False), id="empty traits collapse to NULL", ), pytest.param( @@ -48,7 +48,7 @@ {"trait_key": "age", "trait_value": Decimal("18")}, ], }, - ("env-key", "alice", "env_x_alice", {"age": 18}), + ("env-key", "alice", "env_x_alice", {"age": 18}, False), id="whole-number Decimal narrows to int", ), pytest.param( @@ -62,7 +62,7 @@ {"trait_key": "score", "trait_value": Decimal("1.5")}, ], }, - ("env-key", "alice", "env_x_alice", {"score": 1.5}), + ("env-key", "alice", "env_x_alice", {"score": 1.5}, False), id="fractional Decimal narrows to float", ), pytest.param( @@ -82,6 +82,7 @@ "alice", "env_x_alice", {"plan": "growth", "team": "alpha"}, + False, ), id="multiple traits flatten to a single dict", ), @@ -89,9 +90,22 @@ ) def test_map_identity_document_to_clickhouse_row__cases__return_expected( doc: DynamoIdentity, - expected: tuple[str, str, str, dict[str, object] | None], + expected: tuple[str, str, str, dict[str, object] | None, bool], ) -> None: # Given a Dynamo identity document # When mapped onto an IDENTITIES row # Then it lines up positionally with the IDENTITIES schema assert map_identity_document_to_clickhouse_row("env-key", doc) == expected + + +def test_map_identity_document_to_clickhouse_row__is_deleted_true__sets_flag() -> None: + doc: DynamoIdentity = { + "identity_uuid": UUID_A, + "identifier": "alice", + "environment_api_key": "env-key", + "composite_key": "env_x_alice", + "created_date": "2026-05-08T00:00:00Z", + "identity_traits": [], + } + result = map_identity_document_to_clickhouse_row("env-key", doc, is_deleted=True) + assert result == ("env-key", "alice", "env_x_alice", None, True) diff --git a/api/tests/unit/segment_membership/test_unit_segment_membership_tasks.py b/api/tests/unit/segment_membership/test_unit_segment_membership_tasks.py index 3eff7ba86c38..1b84be1dbe0a 100644 --- a/api/tests/unit/segment_membership/test_unit_segment_membership_tasks.py +++ b/api/tests/unit/segment_membership/test_unit_segment_membership_tasks.py @@ -107,7 +107,7 @@ def test_backfill_identities_to_clickhouse__happy_path__bulk_inserts( sql, rows_arg = cursor.executemany.call_args.args assert sql == ( "INSERT INTO IDENTITIES " - "(environment_id, identifier, identity_key, traits) VALUES" + "(environment_id, identifier, identity_key, traits, is_deleted) VALUES" ) assert {row[0] for row in rows_arg} == {environment.api_key} assert {row[1] for row in rows_arg} == {"a", "b"} @@ -356,3 +356,58 @@ def test_refresh_project_segment_counts__never_matched_pair__no_row_written( assert not SegmentMembershipCount.objects.filter( segment=segment, environment=environment ).exists() + + +def test_write_identity_deletion_tombstone_to_clickhouse__clickhouse_disabled__skips( + mocker: MockerFixture, + settings: SettingsWrapper, + log: StructuredLogCapture, +) -> None: + # Given + settings.CLICKHOUSE_ENABLED = False + spy = mocker.patch.object(tasks, "open_clickhouse_cursor") + + # When + tasks.write_identity_deletion_tombstone_to_clickhouse( + env_key="env-abc", + identifier="alice", + identity_key="env-abc_alice", + ) + + # Then + spy.assert_not_called() + assert any(e["event"] == "tombstone.skipped" for e in log.events) + + +def test_write_identity_deletion_tombstone_to_clickhouse__clickhouse_enabled__writes_tombstone( + mocker: MockerFixture, + settings: SettingsWrapper, + log: StructuredLogCapture, +) -> None: + # Given + settings.CLICKHOUSE_ENABLED = True + cursor = MagicMock() + open_cursor = mocker.patch.object(tasks, "open_clickhouse_cursor") + open_cursor.return_value.__enter__.return_value = cursor + + # When + tasks.write_identity_deletion_tombstone_to_clickhouse( + env_key="env-abc", + identifier="alice", + identity_key="env-abc_alice", + ) + + # Then — exactly one INSERT with is_deleted=True + sql, rows_arg = cursor.executemany.call_args.args + assert sql == ( + "INSERT INTO IDENTITIES " + "(environment_id, identifier, identity_key, traits, is_deleted) VALUES" + ) + assert len(rows_arg) == 1 + row = rows_arg[0] + assert row[0] == "env-abc" # environment_id + assert row[1] == "alice" # identifier + assert row[2] == "env-abc_alice" # identity_key + assert row[3] is None # traits — NULL for tombstone + assert row[4] is True # is_deleted + assert any(e["event"] == "tombstone.written" for e in log.events) From bee3369b292768d93ae50dcbfa20df3cbda01946 Mon Sep 17 00:00:00 2001 From: Anubhav Tandon Date: Fri, 5 Jun 2026 05:44:17 +0530 Subject: [PATCH 2/6] Removed comment --- api/segment_membership/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/segment_membership/tasks.py b/api/segment_membership/tasks.py index 3180ba085ab2..9eb65c1eef7e 100644 --- a/api/segment_membership/tasks.py +++ b/api/segment_membership/tasks.py @@ -219,7 +219,7 @@ def write_identity_deletion_tombstone_to_clickhouse( with open_clickhouse_cursor(log_comment=log_comment) as cursor: cursor.executemany( _INSERT_IDENTITIES_SQL, - [(env_key, identifier, identity_key, None, True)], # type: ignore[arg-type] + [(env_key, identifier, identity_key, None, True)], ) logger.info( "tombstone.written", From 6f15fb93376c23c60a46c556415f363acb4ef46c Mon Sep 17 00:00:00 2001 From: Anubhav Tandon Date: Fri, 5 Jun 2026 05:48:42 +0530 Subject: [PATCH 3/6] Fixed linting error --- .../test_unit_segment_membership_mappers.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/api/tests/unit/segment_membership/test_unit_segment_membership_mappers.py b/api/tests/unit/segment_membership/test_unit_segment_membership_mappers.py index b42e256a2801..d515238191ee 100644 --- a/api/tests/unit/segment_membership/test_unit_segment_membership_mappers.py +++ b/api/tests/unit/segment_membership/test_unit_segment_membership_mappers.py @@ -99,6 +99,7 @@ def test_map_identity_document_to_clickhouse_row__cases__return_expected( def test_map_identity_document_to_clickhouse_row__is_deleted_true__sets_flag() -> None: + # Given a Dynamo identity document and is_deleted=True doc: DynamoIdentity = { "identity_uuid": UUID_A, "identifier": "alice", @@ -107,5 +108,9 @@ def test_map_identity_document_to_clickhouse_row__is_deleted_true__sets_flag() - "created_date": "2026-05-08T00:00:00Z", "identity_traits": [], } + + # When mapped with is_deleted=True result = map_identity_document_to_clickhouse_row("env-key", doc, is_deleted=True) + + # Then the flag is set in the returned tuple assert result == ("env-key", "alice", "env_x_alice", None, True) From 1fb26c71dc3ec3cec1851233bdbe064925d3e42c Mon Sep 17 00:00:00 2001 From: Anubhav Tandon Date: Fri, 5 Jun 2026 06:48:50 +0530 Subject: [PATCH 4/6] regenerate events catalogue for tombstone task log events --- .../observability/_events-catalogue.md | 35 ++++++++++++++----- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/docs/docs/deployment-self-hosting/observability/_events-catalogue.md b/docs/docs/deployment-self-hosting/observability/_events-catalogue.md index 3133424c66bd..565353f2b0bc 100644 --- a/docs/docs/deployment-self-hosting/observability/_events-catalogue.md +++ b/docs/docs/deployment-self-hosting/observability/_events-catalogue.md @@ -328,7 +328,7 @@ Attributes: ### `segment_membership.backfill.environment.completed` Logged at `info` from: - - `api/segment_membership/tasks.py:110` + - `api/segment_membership/tasks.py:111` Attributes: - `environment.id` @@ -338,7 +338,7 @@ Attributes: ### `segment_membership.backfill.environment.failed` Logged at `exception` from: - - `api/segment_membership/tasks.py:103` + - `api/segment_membership/tasks.py:104` Attributes: - `environment.id` @@ -347,8 +347,8 @@ Attributes: ### `segment_membership.backfill.skipped` Logged at `info` from: - - `api/segment_membership/tasks.py:67` - - `api/segment_membership/tasks.py:72` + - `api/segment_membership/tasks.py:68` + - `api/segment_membership/tasks.py:73` Attributes: - `reason` @@ -366,7 +366,7 @@ Attributes: ### `segment_membership.refresh.project.completed` Logged at `info` from: - - `api/segment_membership/tasks.py:185` + - `api/segment_membership/tasks.py:186` Attributes: - `membership_counts.count` @@ -376,7 +376,7 @@ Attributes: ### `segment_membership.refresh.project.failed` Logged at `exception` from: - - `api/segment_membership/tasks.py:158` + - `api/segment_membership/tasks.py:159` Attributes: - `project.id` @@ -384,13 +384,32 @@ Attributes: ### `segment_membership.refresh.project.skipped` Logged at `info` from: - - `api/segment_membership/tasks.py:129` - - `api/segment_membership/tasks.py:138` + - `api/segment_membership/tasks.py:130` + - `api/segment_membership/tasks.py:139` Attributes: - `project.id` - `reason` +### `segment_membership.tombstone.skipped` + +Logged at `info` from: + - `api/segment_membership/tasks.py:210` + +Attributes: + - `env_key` + - `identifier` + - `reason` + +### `segment_membership.tombstone.written` + +Logged at `info` from: + - `api/segment_membership/tasks.py:224` + +Attributes: + - `env_key` + - `identifier` + ### `segments.serializers.segment_revision_created` Logged at `info` from: From aa9687ad7e1c16477874ef1f3c7fa3ffc1ce97f1 Mon Sep 17 00:00:00 2001 From: Anubhav Tandon Date: Fri, 5 Jun 2026 07:00:25 +0530 Subject: [PATCH 5/6] lazy-import tombstone task to avoideager Prometheus init --- api/edge_api/identities/models.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/api/edge_api/identities/models.py b/api/edge_api/identities/models.py index 21a8dccb0d75..11fecd27a87d 100644 --- a/api/edge_api/identities/models.py +++ b/api/edge_api/identities/models.py @@ -18,7 +18,6 @@ from features.models import FeatureState from features.multivariate.models import MultivariateFeatureStateValue from features.versioning.versioning_service import get_environment_flags_dict -from segment_membership.tasks import write_identity_deletion_tombstone_to_clickhouse from users.models import FFAdminUser from util.engine_models.features.models import FeatureStateModel from util.engine_models.identities.models import IdentityFeaturesList, IdentityModel @@ -197,6 +196,10 @@ def delete(self, user: FFAdminUser | APIKeyUser = None) -> None: # type: ignore ) self._reset_initial_state() # type: ignore[no-untyped-call] if settings.CLICKHOUSE_ENABLED: + from segment_membership.tasks import ( + write_identity_deletion_tombstone_to_clickhouse, + ) + write_identity_deletion_tombstone_to_clickhouse.delay( args=( self.engine_identity_model.environment_api_key, From 01e0e80f013af57dc0d276778f3f9bd2eae433ca Mon Sep 17 00:00:00 2001 From: Anubhav Tandon Date: Fri, 5 Jun 2026 07:21:16 +0530 Subject: [PATCH 6/6] Add unit test for EdgeIdentity.delete() dispatch --- .../identities/test_edge_identity_models.py | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/api/tests/unit/edge_api/identities/test_edge_identity_models.py b/api/tests/unit/edge_api/identities/test_edge_identity_models.py index 64c7740c44a8..d4510da7aac5 100644 --- a/api/tests/unit/edge_api/identities/test_edge_identity_models.py +++ b/api/tests/unit/edge_api/identities/test_edge_identity_models.py @@ -6,6 +6,7 @@ from django.utils import timezone from freezegun import freeze_time from pytest_django import DjangoAssertNumQueries +from pytest_django.fixtures import SettingsWrapper from pytest_lazyfixture import lazy_fixture # type: ignore[import-untyped] from pytest_mock import MockerFixture @@ -509,6 +510,56 @@ def test_save__feature_override_updated__generates_audit_records( ) +def test_edge_identity_delete__clickhouse_enabled__dispatches_tombstone_task( + mocker: MockerFixture, + edge_identity_model: EdgeIdentity, + edge_identity_dynamo_wrapper_mock: MagicMock, + settings: SettingsWrapper, +) -> None: + # Given + settings.CLICKHOUSE_ENABLED = True + mock_tombstone_task = mocker.MagicMock() + mocker.patch( + "segment_membership.tasks.write_identity_deletion_tombstone_to_clickhouse", + mock_tombstone_task, + ) + + # When + edge_identity_model.delete() + + # Then + edge_identity_dynamo_wrapper_mock.delete_item.assert_called_once() + mock_tombstone_task.delay.assert_called_once_with( + args=( + edge_identity_model.environment_api_key, + edge_identity_model.identifier, + edge_identity_model.engine_identity_model.composite_key, + ) + ) + + +def test_edge_identity_delete__clickhouse_disabled__no_tombstone_dispatched( + mocker: MockerFixture, + edge_identity_model: EdgeIdentity, + edge_identity_dynamo_wrapper_mock: MagicMock, + settings: SettingsWrapper, +) -> None: + # Given + settings.CLICKHOUSE_ENABLED = False + mock_tombstone_task = mocker.MagicMock() + mocker.patch( + "segment_membership.tasks.write_identity_deletion_tombstone_to_clickhouse", + mock_tombstone_task, + ) + + # When + edge_identity_model.delete() + + # Then + edge_identity_dynamo_wrapper_mock.delete_item.assert_called_once() + mock_tombstone_task.delay.assert_not_called() + + def test_get_all_feature_states__post_v2_versioning_migration__returns_latest_overrides( environment: Environment, feature: Feature,