Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions posthog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
OptionalCaptureArgs,
OptionalSetArgs,
)
from posthog.capture_mode import CaptureMode as CaptureMode
from posthog.client import Client
from posthog.exception_capture import ExceptionCapture
from posthog.contexts import (
Expand Down Expand Up @@ -377,6 +378,9 @@ def get_tags() -> Dict[str, Any]:
# We recommend setting this to False if you are only using the personalApiKey for evaluating remote config payloads via `get_remote_config_payload` and not using local evaluation.
enable_local_evaluation = True # type: bool
flag_definition_cache_provider = None # type: Optional[FlagDefinitionCacheProvider]
# Capture wire protocol for the global client. None defers to POSTHOG_CAPTURE_MODE
# then CaptureMode.V0. See posthog.capture_mode.CaptureMode.
capture_mode = None # type: Optional[CaptureMode]

default_client = None # type: Optional[Client]

Expand Down Expand Up @@ -1175,6 +1179,7 @@ def setup() -> Client:
exception_autocapture_bucket_size=exception_autocapture_bucket_size,
exception_autocapture_refill_rate=exception_autocapture_refill_rate,
exception_autocapture_refill_interval_seconds=exception_autocapture_refill_interval_seconds,
capture_mode=capture_mode,
)

# Always set in case user changes it. Preserve Client's auto-disabled state
Expand Down
82 changes: 82 additions & 0 deletions posthog/capture_mode.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import logging
import os
from enum import Enum
from typing import Optional, Union

log = logging.getLogger("posthog")

CAPTURE_MODE_ENV_VAR = "POSTHOG_CAPTURE_MODE"


class CaptureMode(str, Enum):
"""Selects the capture wire protocol used for event ingestion.

``V0`` is the legacy ``POST /batch/`` endpoint and the default, so upgrading
is transparent to existing callers. ``V1`` opts into
``POST /i/v1/analytics/events`` (Bearer auth, per-event results, partial
retry). Inheriting from ``str`` keeps the members directly comparable to and
serializable as their ``"v0"`` / ``"v1"`` values.
"""

V0 = "v0"
V1 = "v1"


# Accepted spellings for both the explicit kwarg and the env var. Aliases mirror
# the posthog-go naming (``legacy`` / ``analytics_v1``) so the two SDKs are
# configured with the same vocabulary.
_ALIASES: dict[str, CaptureMode] = {
"v0": CaptureMode.V0,
"legacy": CaptureMode.V0,
"v1": CaptureMode.V1,
"analytics_v1": CaptureMode.V1,
}


def _coerce_explicit(value: Union[CaptureMode, str]) -> CaptureMode:
"""Normalize an explicitly-supplied capture mode to a ``CaptureMode``.

Accepts a ``CaptureMode`` or one of the string aliases. An explicit but
unrecognized value is a programming error, so it raises ``ValueError`` rather
than silently defaulting (unlike the env var, which is operator-supplied and
defaults defensively).
"""
if isinstance(value, CaptureMode):
return value
if isinstance(value, str):
resolved = _ALIASES.get(value.strip().lower())
if resolved is not None:
return resolved
raise ValueError(
f"invalid capture_mode {value!r}; expected a CaptureMode or one of "
f"{sorted(_ALIASES)}"
)


def resolve_capture_mode(
capture_mode: Optional[Union[CaptureMode, str]] = None,
) -> CaptureMode:
"""Resolve the effective capture mode.

Precedence: explicit ``capture_mode`` argument > ``POSTHOG_CAPTURE_MODE`` env
var > ``CaptureMode.V0``. An unrecognized env value logs a warning and falls
back to ``V0`` so a typo never silently flips the wire protocol.
"""
if capture_mode is not None:
return _coerce_explicit(capture_mode)

raw = os.environ.get(CAPTURE_MODE_ENV_VAR)
if raw is None or raw.strip() == "":
return CaptureMode.V0

resolved = _ALIASES.get(raw.strip().lower())
if resolved is None:
log.warning(
"Unrecognized %s=%r; falling back to %s. Expected one of %s.",
CAPTURE_MODE_ENV_VAR,
raw,
CaptureMode.V0.value,
sorted(_ALIASES),
)
return CaptureMode.V0
return resolved
13 changes: 13 additions & 0 deletions posthog/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from posthog._async_utils import _BackgroundEventLoopRunner
from posthog.args import ID_TYPES, ExceptionArg, OptionalCaptureArgs, OptionalSetArgs
from posthog.capture_mode import CaptureMode, resolve_capture_mode
from posthog.consumer import Consumer
from posthog.contexts import (
_get_current_context,
Expand Down Expand Up @@ -259,6 +260,7 @@ def __init__(
exception_autocapture_bucket_size=ExceptionCapture.DEFAULT_BUCKET_SIZE,
exception_autocapture_refill_rate=ExceptionCapture.DEFAULT_REFILL_RATE,
exception_autocapture_refill_interval_seconds=ExceptionCapture.DEFAULT_REFILL_INTERVAL_SECONDS,
capture_mode: Optional[Union[CaptureMode, str]] = None,
_dedicated_ai_endpoint=False,
):
"""
Expand Down Expand Up @@ -339,6 +341,11 @@ def __init__(
interval for each exception type's bucket.
exception_autocapture_refill_interval_seconds: Seconds between
token refills for autocaptured exception rate limiting.
capture_mode: Capture wire protocol to use. Defaults to
``CaptureMode.V0`` (legacy ``/batch/``). Set ``CaptureMode.V1``
(or pass the string ``"v1"``) to opt into
``/i/v1/analytics/events``. When omitted, the
``POSTHOG_CAPTURE_MODE`` env var is consulted, then ``V0``.

Examples:
```python
Expand Down Expand Up @@ -391,6 +398,10 @@ def __init__(
self.disable_geoip = disable_geoip
self.is_server = is_server
self.historical_migration = historical_migration
# Selects the capture wire protocol (V0 legacy `/batch/` vs V1
# `/i/v1/analytics/events`). Resolved here so the env-var fallback is
# applied once; V0 is the default and keeps upgrades transparent.
self.capture_mode = resolve_capture_mode(capture_mode)
# Internal, not ready for use: routes `$ai_*` events to a dedicated
# capture-ai endpoint while the backend route + ingress roll out.
self._dedicated_ai_endpoint = _dedicated_ai_endpoint
Expand Down Expand Up @@ -496,6 +507,7 @@ def __init__(
timeout=timeout,
historical_migration=historical_migration,
dedicated_ai_endpoint=self._dedicated_ai_endpoint,
capture_mode=self.capture_mode,
)
self.consumers.append(consumer)

Expand Down Expand Up @@ -1517,6 +1529,7 @@ def _reinit_after_fork(self):
timeout=old.timeout,
historical_migration=old.historical_migration,
dedicated_ai_endpoint=old.dedicated_ai_endpoint,
capture_mode=old.capture_mode,
)
new_consumers.append(consumer)

Expand Down
3 changes: 3 additions & 0 deletions posthog/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from threading import Thread

from posthog._logging import _configure_posthog_logging
from posthog.capture_mode import CaptureMode
from posthog.request import (
AI_EVENTS_ENDPOINT,
EVENTS_ENDPOINT,
Expand Down Expand Up @@ -50,6 +51,7 @@ def __init__(
timeout=15,
historical_migration=False,
dedicated_ai_endpoint=False,
capture_mode=CaptureMode.V0,
):
"""Create a consumer thread."""
Thread.__init__(self)
Expand All @@ -63,6 +65,7 @@ def __init__(
self.queue = queue
self.gzip = gzip
self.dedicated_ai_endpoint = dedicated_ai_endpoint
self.capture_mode = capture_mode
# It's important to set running in the constructor: if we are asked to
# pause immediately after construction, we might set running to True in
# run() *after* we set it to False in pause... and keep running
Expand Down
109 changes: 109 additions & 0 deletions posthog/test/test_capture_mode.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import os
import unittest
from unittest import mock

from parameterized import parameterized

from posthog.capture_mode import (
CAPTURE_MODE_ENV_VAR,
CaptureMode,
resolve_capture_mode,
)
from posthog.client import Client
from posthog.consumer import Consumer
from posthog.test.logging_helpers import capture_message_only_logs
from posthog.test.test_utils import TEST_API_KEY


class TestResolveCaptureMode(unittest.TestCase):
def test_defaults_to_v0_with_no_kwarg_and_no_env(self) -> None:
with mock.patch.dict(os.environ, {}, clear=False):
os.environ.pop(CAPTURE_MODE_ENV_VAR, None)
self.assertIs(resolve_capture_mode(None), CaptureMode.V0)

@parameterized.expand(
[
# (name, kwarg, expected, opposite_env): the env always names the
# mode the kwarg must override, so every row proves the kwarg wins.
("enum_v0", CaptureMode.V0, CaptureMode.V0, "v1"),
("enum_v1", CaptureMode.V1, CaptureMode.V1, "v0"),
("str_v0", "v0", CaptureMode.V0, "v1"),
("str_v1", "v1", CaptureMode.V1, "v0"),
("str_legacy_alias", "legacy", CaptureMode.V0, "v1"),
("str_analytics_v1_alias", "analytics_v1", CaptureMode.V1, "v0"),
("str_upper_and_padded", " V1 ", CaptureMode.V1, "v0"),
]
)
def test_explicit_kwarg_takes_precedence_and_coerces(
self, _name, kwarg, expected, opposite_env
) -> None:
with mock.patch.dict(os.environ, {CAPTURE_MODE_ENV_VAR: opposite_env}):
self.assertIs(resolve_capture_mode(kwarg), expected)
Comment thread
eli-r-ph marked this conversation as resolved.

def test_invalid_kwarg_raises_even_with_valid_env(self) -> None:
# The kwarg path is consulted before the env, so an invalid kwarg raises
# rather than silently falling back to a valid env value.
with mock.patch.dict(os.environ, {CAPTURE_MODE_ENV_VAR: "v1"}):
with self.assertRaises(ValueError):
resolve_capture_mode("bogus")

@parameterized.expand(
[
("v0", "v0", CaptureMode.V0),
("legacy", "legacy", CaptureMode.V0),
("v1", "v1", CaptureMode.V1),
("analytics_v1", "analytics_v1", CaptureMode.V1),
("uppercase", "V1", CaptureMode.V1),
("padded", " v1 ", CaptureMode.V1),
]
)
def test_env_var_resolution(self, _name, env_value, expected) -> None:
with mock.patch.dict(os.environ, {CAPTURE_MODE_ENV_VAR: env_value}):
self.assertIs(resolve_capture_mode(None), expected)

@parameterized.expand([("empty", ""), ("whitespace", " ")])
def test_blank_env_var_defaults_to_v0(self, _name, env_value) -> None:
with mock.patch.dict(os.environ, {CAPTURE_MODE_ENV_VAR: env_value}):
self.assertIs(resolve_capture_mode(None), CaptureMode.V0)

def test_unrecognized_env_var_warns_and_defaults_to_v0(self) -> None:
with mock.patch.dict(os.environ, {CAPTURE_MODE_ENV_VAR: "bogus"}):
with capture_message_only_logs() as stream:
self.assertIs(resolve_capture_mode(None), CaptureMode.V0)
self.assertIn("bogus", stream.getvalue())

@parameterized.expand([("bad_str", "bogus"), ("wrong_type", 1)])
def test_invalid_explicit_kwarg_raises(self, _name, value) -> None:
with self.assertRaises(ValueError):
resolve_capture_mode(value)


class TestCaptureModePlumbing(unittest.TestCase):
def test_client_resolves_and_stores_default_v0(self) -> None:
with mock.patch.dict(os.environ, {}, clear=False):
os.environ.pop(CAPTURE_MODE_ENV_VAR, None)
client = Client(TEST_API_KEY, sync_mode=True)
self.assertIs(client.capture_mode, CaptureMode.V0)

@parameterized.expand(
[
("enum_v1", CaptureMode.V1, CaptureMode.V1),
("str_v1", "v1", CaptureMode.V1),
("enum_v0", CaptureMode.V0, CaptureMode.V0),
]
)
def test_client_kwarg_sets_mode(self, _name, kwarg, expected) -> None:
client = Client(TEST_API_KEY, sync_mode=True, capture_mode=kwarg)
self.assertIs(client.capture_mode, expected)

def test_client_propagates_mode_to_consumers(self) -> None:
# Async (non-sync) client builds Consumer threads; assert each carries
# the resolved mode.
client = Client(TEST_API_KEY, capture_mode=CaptureMode.V1, send=False, thread=2)
self.assertEqual(len(client.consumers), 2)
for consumer in client.consumers:
self.assertIs(consumer.capture_mode, CaptureMode.V1)

def test_consumer_defaults_to_v0(self) -> None:
consumer = Consumer(None, TEST_API_KEY)
self.assertIs(consumer.capture_mode, CaptureMode.V0)
Loading
Loading