diff --git a/posthog/capture_compression.py b/posthog/capture_compression.py new file mode 100644 index 00000000..6d7eff6d --- /dev/null +++ b/posthog/capture_compression.py @@ -0,0 +1,90 @@ +import logging +import os +from enum import Enum +from typing import Optional, Union + +log = logging.getLogger("posthog") + +CAPTURE_COMPRESSION_ENV_VAR = "POSTHOG_CAPTURE_COMPRESSION" + + +class CaptureCompression(str, Enum): + """Selects the request-body compression for capture-v1 uploads. + + Only honored when ``capture_mode`` is ``V1``; the legacy ``/batch/`` path + keeps using its own ``gzip`` flag. ``NONE`` sends the body uncompressed. + ``GZIP`` and ``DEFLATE`` (zlib, RFC 1950) are both stdlib / zero-dependency + and map to the matching ``Content-Encoding`` token the v1 server decodes + (``br``/``zstd`` are accepted by the server too but need extra dependencies, + so they are intentionally left out for now). Inheriting from ``str`` keeps + the members comparable to and serializable as their token values. + """ + + NONE = "none" + GZIP = "gzip" + DEFLATE = "deflate" + + +# Accepted spellings for both the kwarg and the env var. ``identity`` mirrors +# the HTTP token for "no encoding". +_ALIASES: dict[str, CaptureCompression] = { + "none": CaptureCompression.NONE, + "identity": CaptureCompression.NONE, + "gzip": CaptureCompression.GZIP, + "deflate": CaptureCompression.DEFLATE, +} + + +def _coerce_explicit( + value: Union[CaptureCompression, str], +) -> CaptureCompression: + """Normalize an explicitly-supplied compression to a ``CaptureCompression``. + + An explicit but unrecognized value is a programming error, so it raises + ``ValueError`` rather than silently defaulting (unlike the env var, which is + operator-supplied and defaults defensively). + """ + if isinstance(value, CaptureCompression): + return value + if isinstance(value, str): + resolved = _ALIASES.get(value.strip().lower()) + if resolved is not None: + return resolved + raise ValueError( + f"invalid capture_compression {value!r}; expected a CaptureCompression " + f"or one of {sorted(_ALIASES)}" + ) + + +def resolve_capture_compression( + capture_compression: Optional[Union[CaptureCompression, str]] = None, + *, + gzip_fallback: bool = False, +) -> CaptureCompression: + """Resolve the effective v1 compression. + + Precedence: explicit ``capture_compression`` argument > + ``POSTHOG_CAPTURE_COMPRESSION`` env var > the legacy ``gzip`` flag + (``GZIP`` when set) > ``NONE``. An unrecognized env value logs a warning and + falls back to the ``gzip`` flag, so a typo never silently changes encoding. + """ + if capture_compression is not None: + return _coerce_explicit(capture_compression) + + fallback = CaptureCompression.GZIP if gzip_fallback else CaptureCompression.NONE + + raw = os.environ.get(CAPTURE_COMPRESSION_ENV_VAR) + if raw is None or raw.strip() == "": + return fallback + + resolved = _ALIASES.get(raw.strip().lower()) + if resolved is None: + log.warning( + "Unrecognized %s=%r; falling back to %s. Expected one of %s.", + CAPTURE_COMPRESSION_ENV_VAR, + raw, + fallback.value, + sorted(_ALIASES), + ) + return fallback + return resolved diff --git a/posthog/capture_v1.py b/posthog/capture_v1.py index 20c26714..70eb42fa 100644 --- a/posthog/capture_v1.py +++ b/posthog/capture_v1.py @@ -1,9 +1,9 @@ -"""Pure serialization helpers for the Capture V1 wire protocol. +"""Serialization and transport for the Capture V1 wire protocol. -This module holds the *transform* layer for ``POST /i/v1/analytics/events``: -turning a legacy-shaped queued message into a v1 wire event, and assembling the -v1 batch envelope. It performs no I/O — the HTTP transport and partial-retry -logic live alongside it but are added separately. +This module owns everything specific to ``POST /i/v1/analytics/events``: the +*transform* layer (legacy-shaped queued message -> v1 wire event + batch +envelope) and the *transport* layer (a single HTTP attempt, response parsing, +and the partial-retry send loop). The v1 contract (see ``rust/capture/src/v1/analytics/types.rs``) differs from the legacy ``/batch/`` shape in a few load-bearing ways that this module @@ -17,13 +17,48 @@ the top level, so they are relocated into ``properties`` here. - ``$lib``/``$lib_version`` are injected server-side from the required ``PostHog-Sdk-Info`` header and are stripped from v1 properties. + +The response is per-event: a 200 carries a ``results`` map keyed by event uuid, +each tagged ``ok``/``warning`` (terminal-success), ``drop`` (terminal-failure), +or ``retry``. :func:`send_v1_batch` resends only the ``retry`` events on the next +attempt, holding the ``PostHog-Request-Id`` and batch ``created_at`` stable +across attempts while incrementing ``PostHog-Attempt``. ``ok``/``warning``/absent +events succeed; ``drop`` and retry-exhaustion are carried on the +:class:`CaptureV1Error` raised on batch-level/terminal failure, so the consumer's +existing ``on_error(exc, batch)`` path surfaces them unchanged (no per-event +logging of its own). + +Request bodies are optionally compressed per :class:`~posthog.capture_compression.CaptureCompression` +(``gzip`` or zlib-wrapped ``deflate``), advertised via ``Content-Encoding``. """ +import json +import logging +import time +import zlib from collections.abc import Callable +from dataclasses import dataclass from datetime import datetime, timezone -from typing import Any, Optional +from email.utils import parsedate_to_datetime +from gzip import GzipFile +from io import BytesIO +from typing import TYPE_CHECKING, Any, Optional +from uuid import uuid4 + +from posthog.capture_compression import CaptureCompression +from posthog.request import ( + DatetimeSerializer, + USER_AGENT, + APIError, + _get_session, + normalize_host, +) +from posthog.utils import guess_timezone as _guess_timezone, remove_trailing_slash -from posthog.utils import guess_timezone as _guess_timezone +if TYPE_CHECKING: + import requests + +log = logging.getLogger("posthog") CAPTURE_V1_PATH = "/i/v1/analytics/events" @@ -170,16 +205,356 @@ def to_v1_event(msg: dict) -> dict: return event -def build_v1_batch_body(events: list[dict], historical_migration: bool = False) -> dict: +def build_v1_batch_body( + events: list[dict], + historical_migration: bool = False, + created_at: Optional[str] = None, +) -> dict: """Assemble the v1 batch envelope. Carries no ``api_key`` (Bearer auth) and no ``sent_at``. ``historical_migration`` is omitted when False (the server defaults it). + ``created_at`` defaults to now in UTC; :func:`send_v1_batch` passes a value + hoisted once so it stays stable across retry attempts. """ body: dict[str, Any] = { - "created_at": datetime.now(timezone.utc).isoformat(), + "created_at": created_at or datetime.now(timezone.utc).isoformat(), "batch": events, } if historical_migration: body["historical_migration"] = True return body + + +@dataclass +class V1EventResult: + """A single event's directive from a 2xx ``results`` map.""" + + result: Optional[str] + details: Optional[str] = None + + +@dataclass +class V1ParsedResponse: + """Classified outcome of one v1 HTTP attempt. + + ``is_success`` is the 2xx classification. On success ``results`` holds the + per-uuid directives (``None``/``malformed=True`` when the body could not be + parsed — treated as terminal so a bad success never loops forever). On a + non-2xx, ``error_message`` is the best-effort human-readable detail. + """ + + status_code: int + is_success: bool + retry_after: Optional[float] = None + results: Optional[dict[str, V1EventResult]] = None + malformed: bool = False + error_message: str = "" + + +class CaptureV1Error(APIError): + """Batch-level failure of a capture-v1 send. + + Subclasses :class:`APIError` so the consumer's existing ``on_error`` handling + (which already inspects ``status``/``retry_after``) keeps working; the extra + fields carry v1 specifics for richer logging/callbacks. + """ + + def __init__( + self, + status: int | str, + message: str, + *, + retry_after: Optional[float] = None, + request_id: Optional[str] = None, + attempts: Optional[int] = None, + retry_exhausted: Optional[list[str]] = None, + drops: Optional[list[tuple[str, Optional[str]]]] = None, + ): + super().__init__(status, message, retry_after=retry_after) + self.request_id = request_id + self.attempts = attempts + # uuids the server told us to retry but we never delivered (exhausted). + self.retry_exhausted = retry_exhausted or [] + # (uuid, details) pairs the server told us to drop on a 2xx response. + self.drops = drops or [] + + +def _is_success_status(status: int) -> bool: + return 200 <= status < 300 + + +def _parse_retry_after(header_value: Optional[str]) -> Optional[float]: + """Parse a ``Retry-After`` header (delta-seconds or HTTP-date) to seconds.""" + if not header_value: + return None + try: + return float(header_value) + except (ValueError, TypeError): + pass + try: + delta = parsedate_to_datetime(header_value) - datetime.now(timezone.utc) + return max(0.0, delta.total_seconds()) + except (ValueError, TypeError): + return None + + +def _compress_v1( + compression: CaptureCompression, data: str +) -> tuple[str | bytes, Optional[str]]: + """Compress a v1 request body, returning ``(body, Content-Encoding token)``. + + ``GZIP`` emits a gzip stream; ``DEFLATE`` emits a *zlib-wrapped* deflate + stream (RFC 1950, leading ``0x78``) to match posthog-go / posthog-rs and the + server's zlib decoder for ``Content-Encoding: deflate`` — raw, headerless + deflate would be misrouted. ``NONE`` returns the string body and no token. + """ + if compression == CaptureCompression.GZIP: + buf = BytesIO() + with GzipFile(fileobj=buf, mode="w") as gz: + # `data` is produced by json.dumps(), whose default encoding is utf-8. + gz.write(data.encode("utf-8")) + return buf.getvalue(), "gzip" + if compression == CaptureCompression.DEFLATE: + return zlib.compress(data.encode("utf-8")), "deflate" + return data, None + + +def post_v1( + api_key: str, + host: Optional[str], + batch_body: dict, + *, + attempt: int, + request_id: str, + compression: CaptureCompression = CaptureCompression.NONE, + timeout: int = 15, + session: Optional["requests.Session"] = None, +) -> "requests.Response": + """Perform a single ``POST /i/v1/analytics/events`` attempt. + + Bearer-authed (no ``api_key`` in the body) with the required v1 headers. + ``attempt`` (1-based) and the stable ``request_id`` are echoed via + ``PostHog-Attempt``/``PostHog-Request-Id`` so the backend can correlate + retries. The body is compressed per ``compression`` (advertised via + ``Content-Encoding``). Returns the raw response; classification is left to + the caller. + """ + trimmed_host = remove_trailing_slash(normalize_host(host)) + url = trimmed_host + CAPTURE_V1_PATH + data = json.dumps(batch_body, cls=DatetimeSerializer) + headers = { + "Content-Type": "application/json", + "User-Agent": USER_AGENT, + "Authorization": f"Bearer {api_key}", + HEADER_SDK_INFO: USER_AGENT, + HEADER_ATTEMPT: str(attempt), + HEADER_REQUEST_ID: request_id, + HEADER_REQUEST_TIMESTAMP: datetime.now(timezone.utc).isoformat(), + } + body, encoding = _compress_v1(compression, data) + if encoding is not None: + headers["Content-Encoding"] = encoding + + log.debug("capture v1 POST %s attempt=%s request_id=%s", url, attempt, request_id) + return (session or _get_session()).post( + url, data=body, headers=headers, timeout=timeout + ) + + +def parse_v1_response(res: "requests.Response") -> V1ParsedResponse: + """Read and classify a v1 response without raising.""" + status = res.status_code + retry_after = _parse_retry_after(res.headers.get("Retry-After")) + + if _is_success_status(status): + try: + payload = res.json() + raw_results = payload["results"] + results = { + uid: V1EventResult( + result=(r or {}).get("result"), + details=(r or {}).get("details"), + ) + for uid, r in raw_results.items() + } + return V1ParsedResponse(status, True, retry_after, results=results) + except (ValueError, KeyError, AttributeError, TypeError): + # 2xx with a body we can't read as a results map: terminal, so we + # don't loop forever re-sending against a broken success. + return V1ParsedResponse(status, True, retry_after, malformed=True) + + message = "" + try: + payload = res.json() + if isinstance(payload, dict): + message = ( + payload.get("error_description") + or payload.get("error") + or payload.get("detail") + or "" + ) + except (ValueError, AttributeError): + pass + if not message: + message = res.text or f"capture v1 request failed with status {status}" + return V1ParsedResponse(status, False, retry_after, error_message=message) + + +def _backoff(attempt_index: int, retry_after: Optional[float]) -> None: + """Sleep before the next attempt: ``Retry-After`` wins, else exp backoff. + + Mirrors the legacy ``Consumer._send`` policy so both wire protocols back off + identically (capped at 30s). + """ + if retry_after and retry_after > 0: + time.sleep(retry_after) + else: + time.sleep(min(2**attempt_index, 30)) + + +def _log_result_summary( + request_id: str, attempt: int, results: dict[str, V1EventResult] +) -> None: + tally = {RESULT_OK: 0, RESULT_WARNING: 0, RESULT_DROP: 0, RESULT_RETRY: 0} + other = 0 + for r in results.values(): + if r.result in tally: + tally[r.result] += 1 + else: + other += 1 + log.debug( + "capture v1 response request_id=%s attempt=%s events=%d ok=%d warning=%d drop=%d retry=%d other=%d", + request_id, + attempt, + len(results), + tally[RESULT_OK], + tally[RESULT_WARNING], + tally[RESULT_DROP], + tally[RESULT_RETRY], + other, + ) + + +def send_v1_batch( + api_key: str, + host: Optional[str], + batch: list[dict], + *, + compression: CaptureCompression = CaptureCompression.NONE, + timeout: int = 15, + max_retries: int = 3, + historical_migration: bool = False, + session: Optional["requests.Session"] = None, +) -> None: + """Deliver ``batch`` to the v1 endpoint with partial retry. + + The v1 sibling of ``Consumer._send``: it loops up to ``max_retries + 1`` + attempts, but unlike v0 it shrinks the batch to only the events the server + tagged ``retry`` after each 2xx. ``ok``/``warning``/absent events succeed; a + server-chosen ``drop`` on an otherwise-successful request is not a delivery + failure, so it is not raised or logged per-event (the DEBUG summary tallies + it). Raises :class:`CaptureV1Error` (or the underlying transport exception) + on a batch-level terminal/transport failure or once retries are exhausted — + carrying any ``drops`` and exhausted uuids — so the caller's ``on_error`` + fires unchanged. ``request_id`` and the batch ``created_at`` are stable + across attempts; ``PostHog-Attempt`` increments. + """ + request_id = str(uuid4()) + # Hoisted once so the batch envelope is byte-identical across retry attempts + # (only the events list shrinks and the attempt header increments). + created_at = datetime.now(timezone.utc).isoformat() + pending_events = [to_v1_event(m) for m in batch] + pending_uuids = [e["uuid"] for e in pending_events] + last_exc: Optional[Exception] = None + + for attempt_index in range(max_retries + 1): + attempt = attempt_index + 1 + last_attempt = attempt_index == max_retries + body = build_v1_batch_body( + pending_events, historical_migration, created_at=created_at + ) + + try: + res = post_v1( + api_key, + host, + body, + attempt=attempt, + request_id=request_id, + compression=compression, + timeout=timeout, + session=session, + ) + except Exception as e: + # Transport-level failure (connection/timeout): retry like v0 does. + last_exc = e + if last_attempt: + raise + _backoff(attempt_index, None) + continue + + parsed = parse_v1_response(res) + + if parsed.is_success: + if parsed.malformed: + raise CaptureV1Error( + parsed.status_code, + "capture v1 returned a success status with an unparseable body", + request_id=request_id, + attempts=attempt, + ) + results = parsed.results or {} + _log_result_summary(request_id, attempt, results) + + retry_events: list[dict] = [] + retry_uuids: list[str] = [] + drops: list[tuple[str, Optional[str]]] = [] + for event, uid in zip(pending_events, pending_uuids): + directive = results.get(uid) + if directive is None: + # Absent from the map: treated as accepted (matches posthog-rs). + continue + if directive.result == RESULT_RETRY: + retry_events.append(event) + retry_uuids.append(uid) + elif directive.result == RESULT_DROP: + drops.append((uid, directive.details)) + # ok / warning / unrecognized -> terminal success. + + if not retry_uuids: + return + if last_attempt: + raise CaptureV1Error( + parsed.status_code, + f"{len(retry_uuids)} event(s) still pending retry after {attempt} attempt(s)", + request_id=request_id, + attempts=attempt, + retry_exhausted=retry_uuids, + drops=drops, + ) + pending_events, pending_uuids = retry_events, retry_uuids + _backoff(attempt_index, parsed.retry_after) + continue + + # Non-2xx. Retryable transient statuses back off; everything else + # (400/401/402/413/415/429/...) is terminal. + v1_error = CaptureV1Error( + parsed.status_code, + parsed.error_message, + retry_after=parsed.retry_after, + request_id=request_id, + attempts=attempt, + ) + if parsed.status_code in RETRYABLE_STATUSES: + last_exc = v1_error + if last_attempt: + raise v1_error + _backoff(attempt_index, parsed.retry_after) + continue + raise v1_error + + # Unreachable in practice (every branch returns or continues), but keeps the + # function total if max_retries is somehow negative. + if last_exc: + raise last_exc diff --git a/posthog/test/test_capture_compression.py b/posthog/test/test_capture_compression.py new file mode 100644 index 00000000..70c5f0b4 --- /dev/null +++ b/posthog/test/test_capture_compression.py @@ -0,0 +1,100 @@ +import os +import unittest +from unittest import mock + +from parameterized import parameterized + +from posthog.capture_compression import ( + CAPTURE_COMPRESSION_ENV_VAR, + CaptureCompression, + resolve_capture_compression, +) +from posthog.test.logging_helpers import capture_message_only_logs + + +class TestResolveCaptureCompression(unittest.TestCase): + def setUp(self) -> None: + patcher = mock.patch.dict(os.environ, {}, clear=False) + patcher.start() + self.addCleanup(patcher.stop) + os.environ.pop(CAPTURE_COMPRESSION_ENV_VAR, None) + + def test_defaults_to_none_with_no_kwarg_env_or_gzip(self) -> None: + self.assertIs(resolve_capture_compression(None), CaptureCompression.NONE) + + def test_gzip_fallback_used_when_nothing_else_set(self) -> None: + self.assertIs( + resolve_capture_compression(None, gzip_fallback=True), + CaptureCompression.GZIP, + ) + + @parameterized.expand( + [ + ("enum_gzip", CaptureCompression.GZIP, CaptureCompression.GZIP), + ("enum_deflate", CaptureCompression.DEFLATE, CaptureCompression.DEFLATE), + ("enum_none", CaptureCompression.NONE, CaptureCompression.NONE), + ("str_gzip", "gzip", CaptureCompression.GZIP), + ("str_deflate", "deflate", CaptureCompression.DEFLATE), + ("str_none", "none", CaptureCompression.NONE), + ("str_identity_alias", "identity", CaptureCompression.NONE), + ("str_upper_and_padded", " GZIP ", CaptureCompression.GZIP), + ] + ) + def test_explicit_kwarg_takes_precedence_and_coerces( + self, _name, kwarg, expected + ) -> None: + # Env names a different value and gzip_fallback is on, so each row proves + # the explicit kwarg wins over both lower-precedence sources. + with mock.patch.dict(os.environ, {CAPTURE_COMPRESSION_ENV_VAR: "deflate"}): + self.assertIs( + resolve_capture_compression(kwarg, gzip_fallback=True), expected + ) + + def test_invalid_kwarg_raises_even_with_valid_env(self) -> None: + with mock.patch.dict(os.environ, {CAPTURE_COMPRESSION_ENV_VAR: "gzip"}): + with self.assertRaises(ValueError): + resolve_capture_compression("bogus") + + @parameterized.expand([("bad_str", "bogus"), ("wrong_type", 1)]) + def test_invalid_explicit_kwarg_raises(self, _name, value) -> None: + with self.assertRaises(ValueError): + resolve_capture_compression(value) + + @parameterized.expand( + [ + ("gzip", "gzip", CaptureCompression.GZIP), + ("deflate", "deflate", CaptureCompression.DEFLATE), + ("none", "none", CaptureCompression.NONE), + ("identity", "identity", CaptureCompression.NONE), + ("uppercase", "GZIP", CaptureCompression.GZIP), + ("padded", " deflate ", CaptureCompression.DEFLATE), + ] + ) + def test_env_var_resolution(self, _name, env_value, expected) -> None: + with mock.patch.dict(os.environ, {CAPTURE_COMPRESSION_ENV_VAR: env_value}): + self.assertIs(resolve_capture_compression(None), expected) + + def test_env_var_takes_precedence_over_gzip_fallback(self) -> None: + with mock.patch.dict(os.environ, {CAPTURE_COMPRESSION_ENV_VAR: "deflate"}): + self.assertIs( + resolve_capture_compression(None, gzip_fallback=True), + CaptureCompression.DEFLATE, + ) + + @parameterized.expand([("empty", ""), ("whitespace", " ")]) + def test_blank_env_var_falls_through_to_fallback(self, _name, env_value) -> None: + with mock.patch.dict(os.environ, {CAPTURE_COMPRESSION_ENV_VAR: env_value}): + self.assertIs(resolve_capture_compression(None), CaptureCompression.NONE) + self.assertIs( + resolve_capture_compression(None, gzip_fallback=True), + CaptureCompression.GZIP, + ) + + def test_unrecognized_env_var_warns_and_uses_fallback(self) -> None: + with mock.patch.dict(os.environ, {CAPTURE_COMPRESSION_ENV_VAR: "bogus"}): + with capture_message_only_logs() as stream: + self.assertIs( + resolve_capture_compression(None, gzip_fallback=True), + CaptureCompression.GZIP, + ) + self.assertIn("bogus", stream.getvalue()) diff --git a/posthog/test/test_capture_v1.py b/posthog/test/test_capture_v1.py index 8e855533..740766b7 100644 --- a/posthog/test/test_capture_v1.py +++ b/posthog/test/test_capture_v1.py @@ -1,16 +1,124 @@ +import json import unittest +import zlib from datetime import datetime, timezone +from unittest import mock from parameterized import parameterized +from posthog.capture_compression import CaptureCompression from posthog.capture_v1 import ( + CAPTURE_V1_PATH, + HEADER_ATTEMPT, + HEADER_REQUEST_ID, + HEADER_SDK_INFO, + CaptureV1Error, build_v1_batch_body, + parse_v1_response, + post_v1, + send_v1_batch, to_v1_event, _coerce_bool, _coerce_str, ) +class _FakeResponse: + """Minimal stand-in for ``requests.Response`` for transport tests.""" + + def __init__( + self, status_code, *, json_body=None, headers=None, text="", raise_json=False + ): + self.status_code = status_code + self.headers = headers or {} + self.text = text + self._json_body = json_body + self._raise_json = raise_json + + def json(self): + if self._raise_json: + raise ValueError("no json") + return self._json_body + + +class _RecordingSession: + """Captures the args of a single ``.post`` and returns a canned response.""" + + def __init__(self, response): + self._response = response + self.calls = [] + + def post(self, url, data=None, headers=None, timeout=None): + self.calls.append( + {"url": url, "data": data, "headers": headers, "timeout": timeout} + ) + return self._response + + +class _PostV1Stub: + """Drop-in for ``post_v1`` that records calls and replays canned outcomes. + + Each item in ``outcomes`` is either a ``_FakeResponse`` to return or an + ``Exception`` instance to raise (simulating a transport failure). + """ + + def __init__(self, outcomes): + self._outcomes = list(outcomes) + self.calls = [] + + def __call__( + self, + api_key, + host, + batch_body, + *, + attempt, + request_id, + compression=CaptureCompression.NONE, + timeout=15, + session=None, + ): + self.calls.append( + { + "attempt": attempt, + "request_id": request_id, + "compression": compression, + "created_at": batch_body["created_at"], + "uuids": [e["uuid"] for e in batch_body["batch"]], + } + ) + outcome = self._outcomes[len(self.calls) - 1] + if isinstance(outcome, Exception): + raise outcome + return outcome + + +def _msg(uuid, event="e", **overrides): + msg = { + "event": event, + "uuid": uuid, + "distinct_id": "user-1", + "timestamp": "2026-06-27T12:00:00+00:00", + "type": "capture", + "properties": {}, + } + msg.update(overrides) + return msg + + +def _results_response(directives, headers=None): + """200 response whose ``results`` map tags each uuid. + + ``directives`` maps uuid -> ``"ok"`` (or any result string) or a + ``(result, details)`` tuple. + """ + results = {} + for uid, spec in directives.items(): + result, details = spec if isinstance(spec, tuple) else (spec, None) + results[uid] = {"result": result, "details": details} + return _FakeResponse(200, json_body={"results": results}, headers=headers) + + def _legacy_msg(event="my_event", properties=None, **overrides) -> dict: """Minimal legacy-shaped message as it looks coming off the queue.""" msg = { @@ -243,9 +351,256 @@ def test_created_at_is_tz_aware_rfc3339(self) -> None: parsed = datetime.fromisoformat(body["created_at"]) self.assertIsNotNone(parsed.tzinfo) + def test_created_at_passthrough_used_verbatim(self) -> None: + # send_v1_batch hoists created_at and passes it in so it stays stable + # across retry attempts. + body = build_v1_batch_body([], created_at="2026-06-27T12:00:00+00:00") + self.assertEqual(body["created_at"], "2026-06-27T12:00:00+00:00") + def test_historical_migration_omitted_when_false(self) -> None: self.assertNotIn("historical_migration", build_v1_batch_body([])) def test_historical_migration_present_when_true(self) -> None: body = build_v1_batch_body([], historical_migration=True) self.assertIs(body["historical_migration"], True) + + +class TestPostV1(unittest.TestCase): + def _post(self, response, **kwargs): + session = _RecordingSession(response) + body = build_v1_batch_body([to_v1_event(_msg("u-1"))]) + post_v1( + "phc_key", + "https://app.posthog.com/", + body, + attempt=2, + request_id="req-123", + session=session, + **kwargs, + ) + return session.calls[0] + + def test_url_uses_v1_path_and_trims_host(self) -> None: + call = self._post(_results_response({})) + self.assertEqual(call["url"], "https://app.posthog.com" + CAPTURE_V1_PATH) + + def test_required_headers_present(self) -> None: + headers = self._post(_results_response({}))["headers"] + self.assertEqual(headers["Authorization"], "Bearer phc_key") + self.assertEqual(headers[HEADER_ATTEMPT], "2") + self.assertEqual(headers[HEADER_REQUEST_ID], "req-123") + self.assertTrue(headers[HEADER_SDK_INFO].startswith("posthog-python/")) + self.assertEqual(headers["Content-Type"], "application/json") + + def test_no_api_key_in_body(self) -> None: + # v1 authenticates via the Bearer header; the key must not leak into the body. + data = self._post(_results_response({}))["data"] + self.assertNotIn("phc_key", data) + self.assertNotIn("api_key", json.loads(data)) + + def test_uncompressed_body_is_json_str_without_encoding_header(self) -> None: + call = self._post(_results_response({}), compression=CaptureCompression.NONE) + self.assertIsInstance(call["data"], str) + self.assertNotIn("Content-Encoding", call["headers"]) + + def test_gzip_sets_encoding_header_and_compresses_body(self) -> None: + call = self._post(_results_response({}), compression=CaptureCompression.GZIP) + self.assertEqual(call["headers"]["Content-Encoding"], "gzip") + self.assertIsInstance(call["data"], bytes) + self.assertEqual(call["data"][:2], b"\x1f\x8b") # gzip magic + + def test_deflate_sets_encoding_header_and_zlib_wraps_body(self) -> None: + # Must be zlib-wrapped (RFC 1950, 0x78 prefix), matching posthog-go / + # posthog-rs, so the server routes Content-Encoding: deflate to its zlib + # decoder rather than treating it as raw deflate. + call = self._post(_results_response({}), compression=CaptureCompression.DEFLATE) + self.assertEqual(call["headers"]["Content-Encoding"], "deflate") + self.assertIsInstance(call["data"], bytes) + self.assertEqual(call["data"][0], 0x78) # zlib header + roundtripped = zlib.decompress(call["data"]).decode("utf-8") + self.assertNotIn("api_key", json.loads(roundtripped)) + + +class TestParseV1Response(unittest.TestCase): + def test_success_parses_results_with_details(self) -> None: + res = _FakeResponse( + 200, + json_body={"results": {"u-1": {"result": "drop", "details": "spam"}}}, + ) + parsed = parse_v1_response(res) + self.assertTrue(parsed.is_success) + self.assertEqual(parsed.results["u-1"].result, "drop") + self.assertEqual(parsed.results["u-1"].details, "spam") + + @parameterized.expand( + [ + ("unparseable_body", _FakeResponse(200, raise_json=True)), + ("missing_results_key", _FakeResponse(200, json_body={"foo": 1})), + ] + ) + def test_success_with_bad_body_is_malformed(self, _name, res) -> None: + parsed = parse_v1_response(res) + self.assertTrue(parsed.is_success) + self.assertTrue(parsed.malformed) + + @parameterized.expand( + [ + ("error_description", {"error_description": "bad batch"}, "bad batch"), + ("error", {"error": "validation_error"}, "validation_error"), + ("detail", {"detail": "nope"}, "nope"), + ] + ) + def test_error_message_extracted_from_body(self, _name, body, expected) -> None: + parsed = parse_v1_response(_FakeResponse(400, json_body=body)) + self.assertFalse(parsed.is_success) + self.assertEqual(parsed.error_message, expected) + + def test_error_message_falls_back_to_text(self) -> None: + parsed = parse_v1_response(_FakeResponse(400, raise_json=True, text="boom")) + self.assertEqual(parsed.error_message, "boom") + + @parameterized.expand([("numeric", "2", 2.0), ("absent", None, None)]) + def test_retry_after_header(self, _name, header_value, expected) -> None: + headers = {"Retry-After": header_value} if header_value is not None else {} + parsed = parse_v1_response(_FakeResponse(503, headers=headers)) + self.assertEqual(parsed.retry_after, expected) + + +class TestSendV1Batch(unittest.TestCase): + """Drives ``send_v1_batch`` with a stubbed ``post_v1`` and no real sleeps.""" + + def setUp(self) -> None: + sleep_patch = mock.patch("posthog.capture_v1.time.sleep") + self.sleep = sleep_patch.start() + self.addCleanup(sleep_patch.stop) + + def _run(self, batch, outcomes, **kwargs): + stub = _PostV1Stub(outcomes) + with mock.patch("posthog.capture_v1.post_v1", stub): + send_v1_batch("phc_key", "https://app.posthog.com", batch, **kwargs) + return stub + + def _run_expecting_error(self, batch, outcomes, **kwargs): + stub = _PostV1Stub(outcomes) + with mock.patch("posthog.capture_v1.post_v1", stub): + with self.assertRaises(CaptureV1Error) as ctx: + send_v1_batch("phc_key", "https://app.posthog.com", batch, **kwargs) + return stub, ctx.exception + + def test_all_ok_sends_once(self) -> None: + stub = self._run([_msg("u-1")], [_results_response({"u-1": "ok"})]) + self.assertEqual(len(stub.calls), 1) + self.sleep.assert_not_called() + + def test_absent_uuid_treated_as_accepted(self) -> None: + # Empty results map: the event is neither retried nor errored. + stub = self._run([_msg("u-1")], [_results_response({})]) + self.assertEqual(len(stub.calls), 1) + + def test_partial_retry_resends_only_retry_events(self) -> None: + batch = [_msg("u-ok"), _msg("u-retry")] + stub = self._run( + batch, + [ + _results_response({"u-ok": "ok", "u-retry": "retry"}), + _results_response({"u-retry": "ok"}), + ], + ) + self.assertEqual(len(stub.calls), 2) + self.assertEqual(stub.calls[0]["uuids"], ["u-ok", "u-retry"]) + # Second attempt carries only the event the server asked to retry. + self.assertEqual(stub.calls[1]["uuids"], ["u-retry"]) + + def test_request_id_and_created_at_stable_attempt_increments(self) -> None: + stub = self._run( + [_msg("u-1")], + [ + _results_response({"u-1": "retry"}), + _results_response({"u-1": "ok"}), + ], + ) + self.assertEqual(stub.calls[0]["request_id"], stub.calls[1]["request_id"]) + # created_at is hoisted once, so the envelope timestamp is identical + # across retry attempts (only the attempt header increments). + self.assertEqual(stub.calls[0]["created_at"], stub.calls[1]["created_at"]) + self.assertEqual([c["attempt"] for c in stub.calls], [1, 2]) + + def test_compression_forwarded_to_post_v1(self) -> None: + stub = self._run( + [_msg("u-1")], + [_results_response({"u-1": "ok"})], + compression=CaptureCompression.DEFLATE, + ) + self.assertEqual(stub.calls[0]["compression"], CaptureCompression.DEFLATE) + + def test_drop_on_2xx_is_accepted_not_raised_or_logged(self) -> None: + # A server-chosen drop on a successful request is not a delivery failure: + # the send completes without raising and without per-event WARNING noise + # (drops surface via CaptureV1Error only on batch-level failure). + batch = [_msg("u-ok"), _msg("u-drop")] + with self.assertNoLogs("posthog", level="WARNING"): + stub = self._run( + batch, + [_results_response({"u-ok": "ok", "u-drop": ("drop", "invalid")})], + ) + self.assertEqual(len(stub.calls), 1) + + def test_retry_exhausted_raises_with_uuids(self) -> None: + stub, exc = self._run_expecting_error( + [_msg("u-1")], + [_results_response({"u-1": "retry"}), _results_response({"u-1": "retry"})], + max_retries=1, + ) + self.assertEqual(len(stub.calls), 2) + self.assertEqual(exc.retry_exhausted, ["u-1"]) + + def test_malformed_2xx_is_terminal(self) -> None: + stub, exc = self._run_expecting_error( + [_msg("u-1")], [_FakeResponse(200, raise_json=True)] + ) + self.assertEqual(len(stub.calls), 1) + self.assertEqual(exc.status, 200) + + @parameterized.expand([("bad_request", 400), ("rate_limited", 429)]) + def test_terminal_status_raises_immediately(self, _name, status) -> None: + stub, exc = self._run_expecting_error( + [_msg("u-1")], + [_FakeResponse(status, json_body={"error": "nope"})], + max_retries=2, + ) + self.assertEqual(len(stub.calls), 1) # not retried + self.assertEqual(exc.status, status) + + def test_retryable_status_then_success(self) -> None: + stub = self._run( + [_msg("u-1")], + [ + _FakeResponse(503, headers={"Retry-After": "2"}), + _results_response({"u-1": "ok"}), + ], + ) + self.assertEqual(len(stub.calls), 2) + self.sleep.assert_called_once_with(2.0) # honored Retry-After + + def test_retryable_status_exhausted_raises(self) -> None: + stub, exc = self._run_expecting_error( + [_msg("u-1")], [_FakeResponse(503), _FakeResponse(503)], max_retries=1 + ) + self.assertEqual(len(stub.calls), 2) + self.assertEqual(exc.status, 503) + + def test_transport_error_then_success(self) -> None: + stub = self._run( + [_msg("u-1")], + [ConnectionError("boom"), _results_response({"u-1": "ok"})], + ) + self.assertEqual(len(stub.calls), 2) + + def test_transport_error_exhausted_reraises_original(self) -> None: + stub = _PostV1Stub([ConnectionError("boom"), ConnectionError("boom")]) + with mock.patch("posthog.capture_v1.post_v1", stub): + with self.assertRaises(ConnectionError): + send_v1_batch( + "phc_key", "https://app.posthog.com", [_msg("u-1")], max_retries=1 + ) + self.assertEqual(len(stub.calls), 2) diff --git a/references/public_api_snapshot.txt b/references/public_api_snapshot.txt index f4d9d0b7..613169f7 100644 --- a/references/public_api_snapshot.txt +++ b/references/public_api_snapshot.txt @@ -215,6 +215,12 @@ alias posthog.ai.utils.tag -> posthog.tag alias posthog.ai.utils.warn_if_posthog_ai_gateway -> posthog.ai.gateway.warn_if_posthog_ai_gateway alias posthog.args.FeatureFlagEvaluations -> posthog.feature_flag_evaluations.FeatureFlagEvaluations alias posthog.args.SendFeatureFlagsOptions -> posthog.types.SendFeatureFlagsOptions +alias posthog.capture_v1.APIError -> posthog.request.APIError +alias posthog.capture_v1.CaptureCompression -> posthog.capture_compression.CaptureCompression +alias posthog.capture_v1.DatetimeSerializer -> posthog.request.DatetimeSerializer +alias posthog.capture_v1.USER_AGENT -> posthog.request.USER_AGENT +alias posthog.capture_v1.normalize_host -> posthog.request.normalize_host +alias posthog.capture_v1.remove_trailing_slash -> posthog.utils.remove_trailing_slash alias posthog.client.AI_EVENTS_ENDPOINT -> posthog.request.AI_EVENTS_ENDPOINT alias posthog.client.APIError -> posthog.request.APIError alias posthog.client.CaptureMode -> posthog.capture_mode.CaptureMode @@ -474,12 +480,21 @@ attribute posthog.before_send = None attribute posthog.bucketed_rate_limiter.Number = Union[int, float] attribute posthog.bucketed_rate_limiter.ONE_DAY_IN_SECONDS = 86400.0 attribute posthog.bucketed_rate_limiter.log = logging.getLogger('posthog') +attribute posthog.capture_compression.CAPTURE_COMPRESSION_ENV_VAR = 'POSTHOG_CAPTURE_COMPRESSION' +attribute posthog.capture_compression.CaptureCompression.DEFLATE = 'deflate' +attribute posthog.capture_compression.CaptureCompression.GZIP = 'gzip' +attribute posthog.capture_compression.CaptureCompression.NONE = 'none' +attribute posthog.capture_compression.log = logging.getLogger('posthog') attribute posthog.capture_exception_code_variables = False attribute posthog.capture_mode.CAPTURE_MODE_ENV_VAR = 'POSTHOG_CAPTURE_MODE' attribute posthog.capture_mode.CaptureMode.V0 = 'v0' attribute posthog.capture_mode.CaptureMode.V1 = 'v1' attribute posthog.capture_mode.log = logging.getLogger('posthog') attribute posthog.capture_v1.CAPTURE_V1_PATH = '/i/v1/analytics/events' +attribute posthog.capture_v1.CaptureV1Error.attempts = attempts +attribute posthog.capture_v1.CaptureV1Error.drops = drops or [] +attribute posthog.capture_v1.CaptureV1Error.request_id = request_id +attribute posthog.capture_v1.CaptureV1Error.retry_exhausted = retry_exhausted or [] attribute posthog.capture_v1.HEADER_ATTEMPT = 'PostHog-Attempt' attribute posthog.capture_v1.HEADER_REQUEST_ID = 'PostHog-Request-Id' attribute posthog.capture_v1.HEADER_REQUEST_TIMESTAMP = 'PostHog-Request-Timestamp' @@ -490,6 +505,15 @@ attribute posthog.capture_v1.RESULT_RETRY = 'retry' attribute posthog.capture_v1.RESULT_WARNING = 'warning' attribute posthog.capture_v1.RETRYABLE_STATUSES = frozenset({408, 500, 502, 503, 504}) attribute posthog.capture_v1.TERMINAL_STATUSES = frozenset({400, 401, 402, 413, 415, 429}) +attribute posthog.capture_v1.V1EventResult.details: Optional[str] = None +attribute posthog.capture_v1.V1EventResult.result: Optional[str] +attribute posthog.capture_v1.V1ParsedResponse.error_message: str = '' +attribute posthog.capture_v1.V1ParsedResponse.is_success: bool +attribute posthog.capture_v1.V1ParsedResponse.malformed: bool = False +attribute posthog.capture_v1.V1ParsedResponse.results: Optional[dict[str, V1EventResult]] = None +attribute posthog.capture_v1.V1ParsedResponse.retry_after: Optional[float] = None +attribute posthog.capture_v1.V1ParsedResponse.status_code: int +attribute posthog.capture_v1.log = logging.getLogger('posthog') attribute posthog.client.Client.api_key = (project_api_key or '').strip() attribute posthog.client.Client.capture_exception_code_variables = capture_exception_code_variables attribute posthog.client.Client.capture_mode = resolve_capture_mode(capture_mode) @@ -847,7 +871,11 @@ class posthog.ai.types.ToolInProgress class posthog.args.OptionalCaptureArgs class posthog.args.OptionalSetArgs class posthog.bucketed_rate_limiter.BucketedRateLimiter(bucket_size: Number, refill_rate: Number, refill_interval_seconds: Number, on_bucket_rate_limited: Optional[Callable[[Hashable], None]] = None, clock: Callable[[], float] = time.monotonic) +class posthog.capture_compression.CaptureCompression class posthog.capture_mode.CaptureMode +class posthog.capture_v1.CaptureV1Error(status: int | str, message: str, *, retry_after: Optional[float] = None, request_id: Optional[str] = None, attempts: Optional[int] = None, retry_exhausted: Optional[list[str]] = None, drops: Optional[list[tuple[str, Optional[str]]]] = None) +class posthog.capture_v1.V1EventResult(result: Optional[str], details: Optional[str] = None) +class posthog.capture_v1.V1ParsedResponse(status_code: int, is_success: bool, retry_after: Optional[float] = None, results: Optional[dict[str, V1EventResult]] = None, malformed: bool = False, error_message: str = '') class posthog.client.Client(project_api_key: str, host=None, debug=False, max_queue_size=10000, send=True, on_error=None, flush_at=100, flush_interval=5.0, gzip=False, max_retries=3, sync_mode=False, timeout=15, thread=1, poll_interval=30, personal_api_key=None, disabled=False, disable_geoip=True, is_server=True, historical_migration=False, feature_flags_request_timeout_seconds=3, super_properties=None, enable_exception_autocapture=False, log_captured_exceptions=False, project_root=None, privacy_mode=False, before_send=None, flag_fallback_cache_url=None, enable_local_evaluation=True, flag_definition_cache_provider: Optional[FlagDefinitionCacheProvider] = None, capture_exception_code_variables=False, code_variables_mask_patterns=None, code_variables_ignore_patterns=None, code_variables_mask_url_credentials=None, code_variables_detect_secrets=None, in_app_modules: list[str] | None = None, enable_exception_autocapture_rate_limiting=False, exception_autocapture_bucket_size=ExceptionCapture.DEFAULT_BUCKET_SIZE, exception_autocapture_refill_rate=ExceptionCapture.DEFAULT_REFILL_RATE, exception_autocapture_refill_interval_seconds=ExceptionCapture.DEFAULT_REFILL_INTERVAL_SECONDS, capture_mode: Optional[Union[CaptureMode, str]] = None, _dedicated_ai_endpoint=False) class posthog.consumer.Consumer(queue, api_key, flush_at=100, host=None, on_error=None, flush_interval=5.0, gzip=False, retries=10, timeout=15, historical_migration=False, dedicated_ai_endpoint=False, capture_mode=CaptureMode.V0) class posthog.contexts.ContextScope(parent=None, fresh: bool = False, capture_exceptions: bool = True, client: Optional[Client] = None) @@ -970,9 +998,13 @@ function posthog.ai.utils.serialize_raw_usage(raw_usage: Any) -> Optional[Dict[s function posthog.ai.utils.with_privacy_mode(ph_client: PostHogClient, privacy_mode: bool, value: Any) function posthog.alias(previous_id: str, distinct_id: str, timestamp: Optional[datetime.datetime] = None, uuid: Optional[str] = None, disable_geoip: Optional[bool] = None) -> Optional[str] function posthog.capture(event: str, **kwargs: Unpack[OptionalCaptureArgs]) -> Optional[str] +function posthog.capture_compression.resolve_capture_compression(capture_compression: Optional[Union[CaptureCompression, str]] = None, *, gzip_fallback: bool = False) -> CaptureCompression function posthog.capture_exception(exception: Optional[ExceptionArg] = None, **kwargs: Unpack[OptionalCaptureArgs]) -> Optional[str] function posthog.capture_mode.resolve_capture_mode(capture_mode: Optional[Union[CaptureMode, str]] = None) -> CaptureMode -function posthog.capture_v1.build_v1_batch_body(events: list[dict], historical_migration: bool = False) -> dict +function posthog.capture_v1.build_v1_batch_body(events: list[dict], historical_migration: bool = False, created_at: Optional[str] = None) -> dict +function posthog.capture_v1.parse_v1_response(res: requests.Response) -> V1ParsedResponse +function posthog.capture_v1.post_v1(api_key: str, host: Optional[str], batch_body: dict, *, attempt: int, request_id: str, compression: CaptureCompression = CaptureCompression.NONE, timeout: int = 15, session: Optional[requests.Session] = None) -> requests.Response +function posthog.capture_v1.send_v1_batch(api_key: str, host: Optional[str], batch: list[dict], *, compression: CaptureCompression = CaptureCompression.NONE, timeout: int = 15, max_retries: int = 3, historical_migration: bool = False, session: Optional[requests.Session] = None) -> None function posthog.capture_v1.to_v1_event(msg: dict) -> dict function posthog.client.add_context_tags(properties) function posthog.client.get_identity_state(passed) -> tuple[str, bool] @@ -1321,6 +1353,7 @@ module posthog.ai.types module posthog.ai.utils module posthog.args module posthog.bucketed_rate_limiter +module posthog.capture_compression module posthog.capture_mode module posthog.capture_v1 module posthog.client