diff --git a/posthog/capture_v1.py b/posthog/capture_v1.py new file mode 100644 index 00000000..20c26714 --- /dev/null +++ b/posthog/capture_v1.py @@ -0,0 +1,185 @@ +"""Pure serialization helpers for the Capture V1 wire protocol. + +This module holds the *transform* layer for ``POST /i/v1/analytics/events``: +turning a legacy-shaped queued message into a v1 wire event, and assembling the +v1 batch envelope. It performs no I/O — the HTTP transport and partial-retry +logic live alongside it but are added separately. + +The v1 contract (see ``rust/capture/src/v1/analytics/types.rs``) differs from +the legacy ``/batch/`` shape in a few load-bearing ways that this module +encodes: + +- A typed ``options`` object carries a handful of sentinel properties, renamed + and strictly typed. Wrong JSON types fail deserialization of the *whole + batch*, so values are coerced to native types or omitted entirely. +- ``$set``/``$set_once`` have no top-level form in v1; the server reads them + from ``properties``. The legacy ``set()``/``set_once()`` builders emit them at + the top level, so they are relocated into ``properties`` here. +- ``$lib``/``$lib_version`` are injected server-side from the required + ``PostHog-Sdk-Info`` header and are stripped from v1 properties. +""" + +from collections.abc import Callable +from datetime import datetime, timezone +from typing import Any, Optional + +from posthog.utils import guess_timezone as _guess_timezone + +CAPTURE_V1_PATH = "/i/v1/analytics/events" + +# Required request/response headers for the v1 endpoint. Defined here as the +# single source of truth; the transport layer builds requests from them. +HEADER_SDK_INFO = "PostHog-Sdk-Info" +HEADER_ATTEMPT = "PostHog-Attempt" +HEADER_REQUEST_ID = "PostHog-Request-Id" +HEADER_REQUEST_TIMESTAMP = "PostHog-Request-Timestamp" + +# Per-event result codes the backend emits (rust EventResult). `ok`/`warning` +# are terminal-success; `drop` terminal-failure; `retry` is safe to resend. +RESULT_OK = "ok" +RESULT_WARNING = "warning" +RESULT_DROP = "drop" +RESULT_RETRY = "retry" + +# HTTP status classification. 429 is terminal in v1 (unlike v0, where it is +# retried) — the backend signals overload via retryable 5xx + Retry-After. +RETRYABLE_STATUSES = frozenset({408, 500, 502, 503, 504}) +TERMINAL_STATUSES = frozenset({400, 401, 402, 413, 415, 429}) + +# Sentinel properties lifted to top-level string fields on the event. +_TOPLEVEL_SENTINELS: tuple[tuple[str, str], ...] = ( + ("$session_id", "session_id"), + ("$window_id", "window_id"), +) + +# Top-level legacy keys relocated into properties (v1 has no top-level form). +_RELOCATE_TO_PROPERTIES = ("$set", "$set_once") + +# Properties dropped from v1 events (server injects them from PostHog-Sdk-Info). +_STRIP_FROM_PROPERTIES = ("$lib", "$lib_version") + + +def _coerce_bool(value: Any) -> Optional[bool]: + """Coerce a sentinel value to ``bool`` using the backend's truthiness rules. + + Native bool passes through; ``"true"``/``"1"`` and ``"false"``/``"0"`` + (case-insensitive, trimmed) map to the obvious bool; any other numeric value + is nonzero-truthy. Anything else returns ``None`` so the option is omitted + rather than sent with a type the strict v1 schema would reject. + """ + if isinstance(value, bool): + return value + if isinstance(value, str): + normalized = value.strip().lower() + if normalized in ("true", "1"): + return True + if normalized in ("false", "0"): + return False + return None + if isinstance(value, (int, float)): + return value != 0 + return None + + +def _coerce_str(value: Any) -> Optional[str]: + """Accept only ``str`` (the backend's ``product_tour_id`` is ``Option``).""" + return value if isinstance(value, str) else None + + +# Sentinel properties lifted into the typed `options` object: legacy property +# key, the backend's field name, and the coercer enforcing its strict type +# (wrong JSON types fail deserialization of the whole batch, so a value that +# won't coerce is omitted). The coercer is stored directly to keep the dispatch +# type-checked rather than keyed by a stringly-typed name. +_OPTION_SENTINELS: tuple[tuple[str, str, Callable[[Any], Any]], ...] = ( + ("$cookieless_mode", "cookieless_mode", _coerce_bool), + ("$ignore_sent_at", "disable_skew_correction", _coerce_bool), + ("$product_tour_id", "product_tour_id", _coerce_str), + ("$process_person_profile", "process_person_profile", _coerce_bool), +) + + +def _v1_timestamp(timestamp: Any) -> str: + """Return a timezone-aware RFC3339 timestamp string. + + Messages off the queue already carry an ISO-8601 string (``_enqueue`` runs + ``guess_timezone(...).isoformat()``), so that is passed through. A + ``datetime`` is normalized to timezone-aware and serialized; a missing value + defaults to now in UTC. The v1 server parses strictly with + ``DateTime::parse_from_rfc3339`` and rejects naive timestamps. + """ + if timestamp is None: + return datetime.now(timezone.utc).isoformat() + if isinstance(timestamp, datetime): + return _guess_timezone(timestamp).isoformat() + return timestamp + + +def to_v1_event(msg: dict) -> dict: + """Transform a legacy-shaped queued message into a v1 wire event. + + Pure: the input ``msg`` is not mutated (a fresh ``properties`` dict is + built), so it remains safe to keep the original for retries or callbacks. + """ + properties = dict(msg.get("properties") or {}) + + # Relocate top-level $set/$set_once into properties; v1 has no top-level + # form. On the unusual collision where properties already carries the key, + # the properties value wins. + for key in _RELOCATE_TO_PROPERTIES: + top_val = msg.get(key) + if top_val is None: + continue + existing = properties.get(key) + if isinstance(top_val, dict) and isinstance(existing, dict): + properties[key] = {**top_val, **existing} + elif key not in properties: + properties[key] = top_val + + for key in _STRIP_FROM_PROPERTIES: + properties.pop(key, None) + + options: dict[str, Any] = {} + for prop_key, wire_key, coercer in _OPTION_SENTINELS: + if prop_key not in properties: + continue + # Always removed from properties — these sentinels must never reach v1 + # backend properties — but only emitted as an option when coercible. + coerced = coercer(properties.pop(prop_key)) + if coerced is not None: + options[wire_key] = coerced + + top_level: dict[str, str] = {} + for prop_key, field_name in _TOPLEVEL_SENTINELS: + if prop_key not in properties: + continue + coerced_str = _coerce_str(properties.pop(prop_key)) + if coerced_str is not None: + top_level[field_name] = coerced_str + + event = { + "event": msg["event"], + "uuid": msg["uuid"], + "distinct_id": msg["distinct_id"], + "timestamp": _v1_timestamp(msg.get("timestamp")), + # Always a dict so it serializes as "{}" rather than null when empty. + "options": options, + "properties": properties, + } + event.update(top_level) + return event + + +def build_v1_batch_body(events: list[dict], historical_migration: bool = False) -> dict: + """Assemble the v1 batch envelope. + + Carries no ``api_key`` (Bearer auth) and no ``sent_at``. + ``historical_migration`` is omitted when False (the server defaults it). + """ + body: dict[str, Any] = { + "created_at": datetime.now(timezone.utc).isoformat(), + "batch": events, + } + if historical_migration: + body["historical_migration"] = True + return body diff --git a/posthog/test/test_capture_v1.py b/posthog/test/test_capture_v1.py new file mode 100644 index 00000000..8e855533 --- /dev/null +++ b/posthog/test/test_capture_v1.py @@ -0,0 +1,251 @@ +import unittest +from datetime import datetime, timezone + +from parameterized import parameterized + +from posthog.capture_v1 import ( + build_v1_batch_body, + to_v1_event, + _coerce_bool, + _coerce_str, +) + + +def _legacy_msg(event="my_event", properties=None, **overrides) -> dict: + """Minimal legacy-shaped message as it looks coming off the queue.""" + msg = { + "event": event, + "uuid": "0190000000007000800000000000000a", + "distinct_id": "user-1", + "timestamp": "2026-06-27T12:00:00+00:00", + "type": "capture", + "properties": {"$lib": "posthog-python", "$lib_version": "9.9.9"}, + } + if properties is not None: + msg["properties"] = properties + msg.update(overrides) + return msg + + +class TestCoercion(unittest.TestCase): + @parameterized.expand( + [ + ("bool_true", True, True), + ("bool_false", False, False), + ("str_true", "true", True), + ("str_true_upper", "TRUE", True), + ("str_true_padded", " true ", True), + ("str_one", "1", True), + ("str_false", "false", False), + ("str_zero", "0", False), + ("int_nonzero", 5, True), + ("int_zero", 0, False), + ("float_nonzero", 1.5, True), + ("float_zero", 0.0, False), + ("neg_int", -1, True), + ("str_yes_uncoercible", "yes", None), + ("str_empty_uncoercible", "", None), + ("none_uncoercible", None, None), + ("dict_uncoercible", {"a": 1}, None), + ] + ) + def test_coerce_bool(self, _name, value, expected) -> None: + self.assertIs(_coerce_bool(value), expected) + + @parameterized.expand( + [ + ("str", "tour-1", "tour-1"), + ("empty_str", "", ""), + ("int", 123, None), + ("bool", True, None), + ("none", None, None), + ] + ) + def test_coerce_str(self, _name, value, expected) -> None: + self.assertEqual(_coerce_str(value), expected) + + +class TestToV1Event(unittest.TestCase): + def test_required_fields_preserved(self) -> None: + event = to_v1_event(_legacy_msg(event="signed_up")) + self.assertEqual(event["event"], "signed_up") + self.assertEqual(event["uuid"], "0190000000007000800000000000000a") + self.assertEqual(event["distinct_id"], "user-1") + self.assertEqual(event["timestamp"], "2026-06-27T12:00:00+00:00") + + def test_strips_lib_and_lib_version(self) -> None: + event = to_v1_event(_legacy_msg()) + self.assertNotIn("$lib", event["properties"]) + self.assertNotIn("$lib_version", event["properties"]) + + def test_options_empty_dict_when_no_sentinels(self) -> None: + event = to_v1_event(_legacy_msg(properties={"plain": "value"})) + self.assertEqual(event["options"], {}) + self.assertEqual(event["properties"], {"plain": "value"}) + + def test_does_not_leak_non_wire_top_level_keys(self) -> None: + event = to_v1_event(_legacy_msg()) + # `type` is legacy-only; the v1 event carries only documented fields. + self.assertEqual( + set(event), + {"event", "uuid", "distinct_id", "timestamp", "options", "properties"}, + ) + + def test_does_not_mutate_input(self) -> None: + msg = _legacy_msg( + properties={"$cookieless_mode": True, "$session_id": "s-1"}, + **{"$set": {"name": "Max"}}, + ) + original_properties = dict(msg["properties"]) + to_v1_event(msg) + self.assertEqual(msg["properties"], original_properties) + self.assertIn("$set", msg) # top-level $set untouched on the original + + @parameterized.expand( + [ + ("cookieless_mode", "$cookieless_mode", "cookieless_mode", True, True), + ( + "ignore_sent_at_rename", + "$ignore_sent_at", + "disable_skew_correction", + "true", + True, + ), + ( + "process_person_profile", + "$process_person_profile", + "process_person_profile", + "false", + False, + ), + ( + "product_tour_id", + "$product_tour_id", + "product_tour_id", + "tour-7", + "tour-7", + ), + ] + ) + def test_option_sentinels_lifted_renamed_and_coerced( + self, _name, prop_key, wire_key, raw, expected + ) -> None: + event = to_v1_event(_legacy_msg(properties={prop_key: raw})) + self.assertEqual(event["options"], {wire_key: expected}) + self.assertNotIn(prop_key, event["properties"]) + + @parameterized.expand( + [ + ("bad_bool", "$cookieless_mode", "maybe"), + ("bad_tour_id_int", "$product_tour_id", 123), + ] + ) + def test_option_sentinel_removed_but_omitted_on_bad_coercion( + self, _name, prop_key, raw + ) -> None: + event = to_v1_event(_legacy_msg(properties={prop_key: raw})) + # Removed from properties (sentinels must never reach v1 props) but not + # emitted as an option, so a wrong type cannot 400 the whole batch. + self.assertNotIn(prop_key, event["properties"]) + self.assertEqual(event["options"], {}) + + @parameterized.expand( + [ + ("session_id", "$session_id", "session_id", "s-123"), + ("window_id", "$window_id", "window_id", "w-456"), + ] + ) + def test_top_level_string_sentinels(self, _name, prop_key, field_name, raw) -> None: + event = to_v1_event(_legacy_msg(properties={prop_key: raw})) + self.assertEqual(event[field_name], raw) + self.assertNotIn(prop_key, event["properties"]) + + def test_top_level_sentinel_omitted_but_removed_when_not_string(self) -> None: + event = to_v1_event(_legacy_msg(properties={"$session_id": 42})) + self.assertNotIn("session_id", event) + self.assertNotIn("$session_id", event["properties"]) + + def test_all_sentinels_together(self) -> None: + event = to_v1_event( + _legacy_msg( + properties={ + "$cookieless_mode": True, + "$ignore_sent_at": "1", + "$product_tour_id": "tour-x", + "$process_person_profile": 0, + "$session_id": "s-1", + "$window_id": "w-1", + "$geoip_disable": True, + "custom": "keep", + } + ) + ) + self.assertEqual( + event["options"], + { + "cookieless_mode": True, + "disable_skew_correction": True, + "product_tour_id": "tour-x", + "process_person_profile": False, + }, + ) + self.assertEqual(event["session_id"], "s-1") + self.assertEqual(event["window_id"], "w-1") + # Non-sentinel props (including $geoip_disable) are left intact. + self.assertEqual( + event["properties"], {"$geoip_disable": True, "custom": "keep"} + ) + + @parameterized.expand([("set", "$set"), ("set_once", "$set_once")]) + def test_top_level_set_relocated_into_properties(self, _name, key) -> None: + msg = _legacy_msg(properties={}, **{key: {"email": "a@b.com"}}) + event = to_v1_event(msg) + self.assertEqual(event["properties"][key], {"email": "a@b.com"}) + self.assertNotIn(key, event) # not a top-level v1 field + + def test_top_level_set_merges_with_existing_properties_set(self) -> None: + # properties wins on key collision. + msg = _legacy_msg( + properties={"$set": {"a": "from_props", "b": "props_only"}}, + **{"$set": {"a": "from_top", "c": "top_only"}}, + ) + event = to_v1_event(msg) + self.assertEqual( + event["properties"]["$set"], + {"a": "from_props", "b": "props_only", "c": "top_only"}, + ) + + def test_groups_left_in_properties(self) -> None: + event = to_v1_event(_legacy_msg(properties={"$groups": {"company": "ph"}})) + self.assertEqual(event["properties"]["$groups"], {"company": "ph"}) + + def test_timestamp_naive_datetime_made_tz_aware(self) -> None: + event = to_v1_event(_legacy_msg(timestamp=datetime(2026, 6, 27, 12, 0, 0))) + parsed = datetime.fromisoformat(event["timestamp"]) + self.assertIsNotNone(parsed.tzinfo) + + def test_timestamp_none_defaults_to_utc_now(self) -> None: + event = to_v1_event(_legacy_msg(timestamp=None)) + parsed = datetime.fromisoformat(event["timestamp"]) + self.assertEqual(parsed.tzinfo, timezone.utc) + + +class TestBuildV1BatchBody(unittest.TestCase): + def test_envelope_shape_and_no_legacy_fields(self) -> None: + events = [{"event": "e"}] + body = build_v1_batch_body(events) + self.assertEqual(body["batch"], events) + self.assertNotIn("api_key", body) + self.assertNotIn("sent_at", body) + + def test_created_at_is_tz_aware_rfc3339(self) -> None: + body = build_v1_batch_body([]) + parsed = datetime.fromisoformat(body["created_at"]) + self.assertIsNotNone(parsed.tzinfo) + + def test_historical_migration_omitted_when_false(self) -> None: + self.assertNotIn("historical_migration", build_v1_batch_body([])) + + def test_historical_migration_present_when_true(self) -> None: + body = build_v1_batch_body([], historical_migration=True) + self.assertIs(body["historical_migration"], True) diff --git a/references/public_api_snapshot.txt b/references/public_api_snapshot.txt index 0955c651..f4d9d0b7 100644 --- a/references/public_api_snapshot.txt +++ b/references/public_api_snapshot.txt @@ -479,6 +479,17 @@ attribute posthog.capture_mode.CAPTURE_MODE_ENV_VAR = 'POSTHOG_CAPTURE_MODE' attribute posthog.capture_mode.CaptureMode.V0 = 'v0' attribute posthog.capture_mode.CaptureMode.V1 = 'v1' attribute posthog.capture_mode.log = logging.getLogger('posthog') +attribute posthog.capture_v1.CAPTURE_V1_PATH = '/i/v1/analytics/events' +attribute posthog.capture_v1.HEADER_ATTEMPT = 'PostHog-Attempt' +attribute posthog.capture_v1.HEADER_REQUEST_ID = 'PostHog-Request-Id' +attribute posthog.capture_v1.HEADER_REQUEST_TIMESTAMP = 'PostHog-Request-Timestamp' +attribute posthog.capture_v1.HEADER_SDK_INFO = 'PostHog-Sdk-Info' +attribute posthog.capture_v1.RESULT_DROP = 'drop' +attribute posthog.capture_v1.RESULT_OK = 'ok' +attribute posthog.capture_v1.RESULT_RETRY = 'retry' +attribute posthog.capture_v1.RESULT_WARNING = 'warning' +attribute posthog.capture_v1.RETRYABLE_STATUSES = frozenset({408, 500, 502, 503, 504}) +attribute posthog.capture_v1.TERMINAL_STATUSES = frozenset({400, 401, 402, 413, 415, 429}) attribute posthog.client.Client.api_key = (project_api_key or '').strip() attribute posthog.client.Client.capture_exception_code_variables = capture_exception_code_variables attribute posthog.client.Client.capture_mode = resolve_capture_mode(capture_mode) @@ -961,6 +972,8 @@ function posthog.alias(previous_id: str, distinct_id: str, timestamp: Optional[d function posthog.capture(event: str, **kwargs: Unpack[OptionalCaptureArgs]) -> Optional[str] function posthog.capture_exception(exception: Optional[ExceptionArg] = None, **kwargs: Unpack[OptionalCaptureArgs]) -> Optional[str] function posthog.capture_mode.resolve_capture_mode(capture_mode: Optional[Union[CaptureMode, str]] = None) -> CaptureMode +function posthog.capture_v1.build_v1_batch_body(events: list[dict], historical_migration: bool = False) -> dict +function posthog.capture_v1.to_v1_event(msg: dict) -> dict function posthog.client.add_context_tags(properties) function posthog.client.get_identity_state(passed) -> tuple[str, bool] function posthog.client.no_throw(default_return=None) @@ -1309,6 +1322,7 @@ module posthog.ai.utils module posthog.args module posthog.bucketed_rate_limiter module posthog.capture_mode +module posthog.capture_v1 module posthog.client module posthog.consumer module posthog.contexts