diff --git a/posthog/__init__.py b/posthog/__init__.py index 871b3ceb..444f043b 100644 --- a/posthog/__init__.py +++ b/posthog/__init__.py @@ -9,6 +9,7 @@ OptionalCaptureArgs, OptionalSetArgs, ) +from posthog.capture_mode import CaptureMode as CaptureMode from posthog.client import Client from posthog.exception_capture import ExceptionCapture from posthog.contexts import ( @@ -377,6 +378,9 @@ def get_tags() -> Dict[str, Any]: # We recommend setting this to False if you are only using the personalApiKey for evaluating remote config payloads via `get_remote_config_payload` and not using local evaluation. enable_local_evaluation = True # type: bool flag_definition_cache_provider = None # type: Optional[FlagDefinitionCacheProvider] +# Capture wire protocol for the global client. None defers to POSTHOG_CAPTURE_MODE +# then CaptureMode.V0. See posthog.capture_mode.CaptureMode. +capture_mode = None # type: Optional[CaptureMode] default_client = None # type: Optional[Client] @@ -1175,6 +1179,7 @@ def setup() -> Client: exception_autocapture_bucket_size=exception_autocapture_bucket_size, exception_autocapture_refill_rate=exception_autocapture_refill_rate, exception_autocapture_refill_interval_seconds=exception_autocapture_refill_interval_seconds, + capture_mode=capture_mode, ) # Always set in case user changes it. Preserve Client's auto-disabled state diff --git a/posthog/capture_mode.py b/posthog/capture_mode.py new file mode 100644 index 00000000..5bd761bf --- /dev/null +++ b/posthog/capture_mode.py @@ -0,0 +1,82 @@ +import logging +import os +from enum import Enum +from typing import Optional, Union + +log = logging.getLogger("posthog") + +CAPTURE_MODE_ENV_VAR = "POSTHOG_CAPTURE_MODE" + + +class CaptureMode(str, Enum): + """Selects the capture wire protocol used for event ingestion. + + ``V0`` is the legacy ``POST /batch/`` endpoint and the default, so upgrading + is transparent to existing callers. ``V1`` opts into + ``POST /i/v1/analytics/events`` (Bearer auth, per-event results, partial + retry). Inheriting from ``str`` keeps the members directly comparable to and + serializable as their ``"v0"`` / ``"v1"`` values. + """ + + V0 = "v0" + V1 = "v1" + + +# Accepted spellings for both the explicit kwarg and the env var. Aliases mirror +# the posthog-go naming (``legacy`` / ``analytics_v1``) so the two SDKs are +# configured with the same vocabulary. +_ALIASES: dict[str, CaptureMode] = { + "v0": CaptureMode.V0, + "legacy": CaptureMode.V0, + "v1": CaptureMode.V1, + "analytics_v1": CaptureMode.V1, +} + + +def _coerce_explicit(value: Union[CaptureMode, str]) -> CaptureMode: + """Normalize an explicitly-supplied capture mode to a ``CaptureMode``. + + Accepts a ``CaptureMode`` or one of the string aliases. An explicit but + unrecognized value is a programming error, so it raises ``ValueError`` rather + than silently defaulting (unlike the env var, which is operator-supplied and + defaults defensively). + """ + if isinstance(value, CaptureMode): + return value + if isinstance(value, str): + resolved = _ALIASES.get(value.strip().lower()) + if resolved is not None: + return resolved + raise ValueError( + f"invalid capture_mode {value!r}; expected a CaptureMode or one of " + f"{sorted(_ALIASES)}" + ) + + +def resolve_capture_mode( + capture_mode: Optional[Union[CaptureMode, str]] = None, +) -> CaptureMode: + """Resolve the effective capture mode. + + Precedence: explicit ``capture_mode`` argument > ``POSTHOG_CAPTURE_MODE`` env + var > ``CaptureMode.V0``. An unrecognized env value logs a warning and falls + back to ``V0`` so a typo never silently flips the wire protocol. + """ + if capture_mode is not None: + return _coerce_explicit(capture_mode) + + raw = os.environ.get(CAPTURE_MODE_ENV_VAR) + if raw is None or raw.strip() == "": + return CaptureMode.V0 + + resolved = _ALIASES.get(raw.strip().lower()) + if resolved is None: + log.warning( + "Unrecognized %s=%r; falling back to %s. Expected one of %s.", + CAPTURE_MODE_ENV_VAR, + raw, + CaptureMode.V0.value, + sorted(_ALIASES), + ) + return CaptureMode.V0 + return resolved diff --git a/posthog/client.py b/posthog/client.py index 1fbff223..00d9c284 100644 --- a/posthog/client.py +++ b/posthog/client.py @@ -16,6 +16,7 @@ from posthog._async_utils import _BackgroundEventLoopRunner from posthog.args import ID_TYPES, ExceptionArg, OptionalCaptureArgs, OptionalSetArgs +from posthog.capture_mode import CaptureMode, resolve_capture_mode from posthog.consumer import Consumer from posthog.contexts import ( _get_current_context, @@ -259,6 +260,7 @@ def __init__( exception_autocapture_bucket_size=ExceptionCapture.DEFAULT_BUCKET_SIZE, exception_autocapture_refill_rate=ExceptionCapture.DEFAULT_REFILL_RATE, exception_autocapture_refill_interval_seconds=ExceptionCapture.DEFAULT_REFILL_INTERVAL_SECONDS, + capture_mode: Optional[Union[CaptureMode, str]] = None, _dedicated_ai_endpoint=False, ): """ @@ -339,6 +341,11 @@ def __init__( interval for each exception type's bucket. exception_autocapture_refill_interval_seconds: Seconds between token refills for autocaptured exception rate limiting. + capture_mode: Capture wire protocol to use. Defaults to + ``CaptureMode.V0`` (legacy ``/batch/``). Set ``CaptureMode.V1`` + (or pass the string ``"v1"``) to opt into + ``/i/v1/analytics/events``. When omitted, the + ``POSTHOG_CAPTURE_MODE`` env var is consulted, then ``V0``. Examples: ```python @@ -391,6 +398,10 @@ def __init__( self.disable_geoip = disable_geoip self.is_server = is_server self.historical_migration = historical_migration + # Selects the capture wire protocol (V0 legacy `/batch/` vs V1 + # `/i/v1/analytics/events`). Resolved here so the env-var fallback is + # applied once; V0 is the default and keeps upgrades transparent. + self.capture_mode = resolve_capture_mode(capture_mode) # Internal, not ready for use: routes `$ai_*` events to a dedicated # capture-ai endpoint while the backend route + ingress roll out. self._dedicated_ai_endpoint = _dedicated_ai_endpoint @@ -496,6 +507,7 @@ def __init__( timeout=timeout, historical_migration=historical_migration, dedicated_ai_endpoint=self._dedicated_ai_endpoint, + capture_mode=self.capture_mode, ) self.consumers.append(consumer) @@ -1517,6 +1529,7 @@ def _reinit_after_fork(self): timeout=old.timeout, historical_migration=old.historical_migration, dedicated_ai_endpoint=old.dedicated_ai_endpoint, + capture_mode=old.capture_mode, ) new_consumers.append(consumer) diff --git a/posthog/consumer.py b/posthog/consumer.py index 567292e8..cf5256db 100644 --- a/posthog/consumer.py +++ b/posthog/consumer.py @@ -5,6 +5,7 @@ from threading import Thread from posthog._logging import _configure_posthog_logging +from posthog.capture_mode import CaptureMode from posthog.request import ( AI_EVENTS_ENDPOINT, EVENTS_ENDPOINT, @@ -50,6 +51,7 @@ def __init__( timeout=15, historical_migration=False, dedicated_ai_endpoint=False, + capture_mode=CaptureMode.V0, ): """Create a consumer thread.""" Thread.__init__(self) @@ -63,6 +65,7 @@ def __init__( self.queue = queue self.gzip = gzip self.dedicated_ai_endpoint = dedicated_ai_endpoint + self.capture_mode = capture_mode # It's important to set running in the constructor: if we are asked to # pause immediately after construction, we might set running to True in # run() *after* we set it to False in pause... and keep running diff --git a/posthog/test/test_capture_mode.py b/posthog/test/test_capture_mode.py new file mode 100644 index 00000000..23594120 --- /dev/null +++ b/posthog/test/test_capture_mode.py @@ -0,0 +1,109 @@ +import os +import unittest +from unittest import mock + +from parameterized import parameterized + +from posthog.capture_mode import ( + CAPTURE_MODE_ENV_VAR, + CaptureMode, + resolve_capture_mode, +) +from posthog.client import Client +from posthog.consumer import Consumer +from posthog.test.logging_helpers import capture_message_only_logs +from posthog.test.test_utils import TEST_API_KEY + + +class TestResolveCaptureMode(unittest.TestCase): + def test_defaults_to_v0_with_no_kwarg_and_no_env(self) -> None: + with mock.patch.dict(os.environ, {}, clear=False): + os.environ.pop(CAPTURE_MODE_ENV_VAR, None) + self.assertIs(resolve_capture_mode(None), CaptureMode.V0) + + @parameterized.expand( + [ + # (name, kwarg, expected, opposite_env): the env always names the + # mode the kwarg must override, so every row proves the kwarg wins. + ("enum_v0", CaptureMode.V0, CaptureMode.V0, "v1"), + ("enum_v1", CaptureMode.V1, CaptureMode.V1, "v0"), + ("str_v0", "v0", CaptureMode.V0, "v1"), + ("str_v1", "v1", CaptureMode.V1, "v0"), + ("str_legacy_alias", "legacy", CaptureMode.V0, "v1"), + ("str_analytics_v1_alias", "analytics_v1", CaptureMode.V1, "v0"), + ("str_upper_and_padded", " V1 ", CaptureMode.V1, "v0"), + ] + ) + def test_explicit_kwarg_takes_precedence_and_coerces( + self, _name, kwarg, expected, opposite_env + ) -> None: + with mock.patch.dict(os.environ, {CAPTURE_MODE_ENV_VAR: opposite_env}): + self.assertIs(resolve_capture_mode(kwarg), expected) + + def test_invalid_kwarg_raises_even_with_valid_env(self) -> None: + # The kwarg path is consulted before the env, so an invalid kwarg raises + # rather than silently falling back to a valid env value. + with mock.patch.dict(os.environ, {CAPTURE_MODE_ENV_VAR: "v1"}): + with self.assertRaises(ValueError): + resolve_capture_mode("bogus") + + @parameterized.expand( + [ + ("v0", "v0", CaptureMode.V0), + ("legacy", "legacy", CaptureMode.V0), + ("v1", "v1", CaptureMode.V1), + ("analytics_v1", "analytics_v1", CaptureMode.V1), + ("uppercase", "V1", CaptureMode.V1), + ("padded", " v1 ", CaptureMode.V1), + ] + ) + def test_env_var_resolution(self, _name, env_value, expected) -> None: + with mock.patch.dict(os.environ, {CAPTURE_MODE_ENV_VAR: env_value}): + self.assertIs(resolve_capture_mode(None), expected) + + @parameterized.expand([("empty", ""), ("whitespace", " ")]) + def test_blank_env_var_defaults_to_v0(self, _name, env_value) -> None: + with mock.patch.dict(os.environ, {CAPTURE_MODE_ENV_VAR: env_value}): + self.assertIs(resolve_capture_mode(None), CaptureMode.V0) + + def test_unrecognized_env_var_warns_and_defaults_to_v0(self) -> None: + with mock.patch.dict(os.environ, {CAPTURE_MODE_ENV_VAR: "bogus"}): + with capture_message_only_logs() as stream: + self.assertIs(resolve_capture_mode(None), CaptureMode.V0) + self.assertIn("bogus", stream.getvalue()) + + @parameterized.expand([("bad_str", "bogus"), ("wrong_type", 1)]) + def test_invalid_explicit_kwarg_raises(self, _name, value) -> None: + with self.assertRaises(ValueError): + resolve_capture_mode(value) + + +class TestCaptureModePlumbing(unittest.TestCase): + def test_client_resolves_and_stores_default_v0(self) -> None: + with mock.patch.dict(os.environ, {}, clear=False): + os.environ.pop(CAPTURE_MODE_ENV_VAR, None) + client = Client(TEST_API_KEY, sync_mode=True) + self.assertIs(client.capture_mode, CaptureMode.V0) + + @parameterized.expand( + [ + ("enum_v1", CaptureMode.V1, CaptureMode.V1), + ("str_v1", "v1", CaptureMode.V1), + ("enum_v0", CaptureMode.V0, CaptureMode.V0), + ] + ) + def test_client_kwarg_sets_mode(self, _name, kwarg, expected) -> None: + client = Client(TEST_API_KEY, sync_mode=True, capture_mode=kwarg) + self.assertIs(client.capture_mode, expected) + + def test_client_propagates_mode_to_consumers(self) -> None: + # Async (non-sync) client builds Consumer threads; assert each carries + # the resolved mode. + client = Client(TEST_API_KEY, capture_mode=CaptureMode.V1, send=False, thread=2) + self.assertEqual(len(client.consumers), 2) + for consumer in client.consumers: + self.assertIs(consumer.capture_mode, CaptureMode.V1) + + def test_consumer_defaults_to_v0(self) -> None: + consumer = Consumer(None, TEST_API_KEY) + self.assertIs(consumer.capture_mode, CaptureMode.V0) diff --git a/references/public_api_snapshot.txt b/references/public_api_snapshot.txt index 772cf18d..0955c651 100644 --- a/references/public_api_snapshot.txt +++ b/references/public_api_snapshot.txt @@ -4,6 +4,7 @@ # members. Modules with __all__ use it; other modules include non-underscore # names. External imports are excluded. alias posthog.BeforeSendCallback -> posthog.types.BeforeSendCallback +alias posthog.CaptureMode -> posthog.capture_mode.CaptureMode alias posthog.Client -> posthog.client.Client alias posthog.DEFAULT_CODE_VARIABLES_DETECT_SECRETS -> posthog.exception_utils.DEFAULT_CODE_VARIABLES_DETECT_SECRETS alias posthog.DEFAULT_CODE_VARIABLES_IGNORE_PATTERNS -> posthog.exception_utils.DEFAULT_CODE_VARIABLES_IGNORE_PATTERNS @@ -216,6 +217,7 @@ alias posthog.args.FeatureFlagEvaluations -> posthog.feature_flag_evaluations.Fe alias posthog.args.SendFeatureFlagsOptions -> posthog.types.SendFeatureFlagsOptions alias posthog.client.AI_EVENTS_ENDPOINT -> posthog.request.AI_EVENTS_ENDPOINT alias posthog.client.APIError -> posthog.request.APIError +alias posthog.client.CaptureMode -> posthog.capture_mode.CaptureMode alias posthog.client.Consumer -> posthog.consumer.Consumer alias posthog.client.DEFAULT_CODE_VARIABLES_DETECT_SECRETS -> posthog.exception_utils.DEFAULT_CODE_VARIABLES_DETECT_SECRETS alias posthog.client.DEFAULT_CODE_VARIABLES_IGNORE_PATTERNS -> posthog.exception_utils.DEFAULT_CODE_VARIABLES_IGNORE_PATTERNS @@ -275,6 +277,7 @@ alias posthog.client.normalize_host -> posthog.request.normalize_host alias posthog.client.remote_config -> posthog.request.remote_config alias posthog.client.reset_sessions -> posthog.request.reset_sessions alias posthog.client.resolve_bucketing_value -> posthog.feature_flags.resolve_bucketing_value +alias posthog.client.resolve_capture_mode -> posthog.capture_mode.resolve_capture_mode alias posthog.client.system_context -> posthog.utils.system_context alias posthog.client.to_flags_and_payloads -> posthog.types.to_flags_and_payloads alias posthog.client.to_payloads -> posthog.types.to_payloads @@ -282,6 +285,7 @@ alias posthog.client.to_values -> posthog.types.to_values alias posthog.client.try_attach_code_variables_to_frames -> posthog.exception_utils.try_attach_code_variables_to_frames alias posthog.consumer.AI_EVENTS_ENDPOINT -> posthog.request.AI_EVENTS_ENDPOINT alias posthog.consumer.APIError -> posthog.request.APIError +alias posthog.consumer.CaptureMode -> posthog.capture_mode.CaptureMode alias posthog.consumer.DatetimeSerializer -> posthog.request.DatetimeSerializer alias posthog.consumer.EVENTS_ENDPOINT -> posthog.request.EVENTS_ENDPOINT alias posthog.consumer.batch_post -> posthog.request.batch_post @@ -471,8 +475,13 @@ attribute posthog.bucketed_rate_limiter.Number = Union[int, float] attribute posthog.bucketed_rate_limiter.ONE_DAY_IN_SECONDS = 86400.0 attribute posthog.bucketed_rate_limiter.log = logging.getLogger('posthog') attribute posthog.capture_exception_code_variables = False +attribute posthog.capture_mode.CAPTURE_MODE_ENV_VAR = 'POSTHOG_CAPTURE_MODE' +attribute posthog.capture_mode.CaptureMode.V0 = 'v0' +attribute posthog.capture_mode.CaptureMode.V1 = 'v1' +attribute posthog.capture_mode.log = logging.getLogger('posthog') attribute posthog.client.Client.api_key = (project_api_key or '').strip() attribute posthog.client.Client.capture_exception_code_variables = capture_exception_code_variables +attribute posthog.client.Client.capture_mode = resolve_capture_mode(capture_mode) attribute posthog.client.Client.code_variables_detect_secrets = code_variables_detect_secrets if code_variables_detect_secrets is not None else DEFAULT_CODE_VARIABLES_DETECT_SECRETS attribute posthog.client.Client.code_variables_ignore_patterns = code_variables_ignore_patterns if code_variables_ignore_patterns is not None else DEFAULT_CODE_VARIABLES_IGNORE_PATTERNS attribute posthog.client.Client.code_variables_mask_patterns = code_variables_mask_patterns if code_variables_mask_patterns is not None else DEFAULT_CODE_VARIABLES_MASK_PATTERNS @@ -524,6 +533,7 @@ attribute posthog.code_variables_mask_url_credentials = DEFAULT_CODE_VARIABLES_M attribute posthog.consumer.AI_MAX_MSG_SIZE = 8 * 1024 * 1024 attribute posthog.consumer.BATCH_SIZE_LIMIT = 5 * 1024 * 1024 attribute posthog.consumer.Consumer.api_key = api_key +attribute posthog.consumer.Consumer.capture_mode = capture_mode attribute posthog.consumer.Consumer.daemon = True attribute posthog.consumer.Consumer.dedicated_ai_endpoint = dedicated_ai_endpoint attribute posthog.consumer.Consumer.flush_at = flush_at @@ -826,8 +836,9 @@ class posthog.ai.types.ToolInProgress class posthog.args.OptionalCaptureArgs class posthog.args.OptionalSetArgs class posthog.bucketed_rate_limiter.BucketedRateLimiter(bucket_size: Number, refill_rate: Number, refill_interval_seconds: Number, on_bucket_rate_limited: Optional[Callable[[Hashable], None]] = None, clock: Callable[[], float] = time.monotonic) -class posthog.client.Client(project_api_key: str, host=None, debug=False, max_queue_size=10000, send=True, on_error=None, flush_at=100, flush_interval=5.0, gzip=False, max_retries=3, sync_mode=False, timeout=15, thread=1, poll_interval=30, personal_api_key=None, disabled=False, disable_geoip=True, is_server=True, historical_migration=False, feature_flags_request_timeout_seconds=3, super_properties=None, enable_exception_autocapture=False, log_captured_exceptions=False, project_root=None, privacy_mode=False, before_send=None, flag_fallback_cache_url=None, enable_local_evaluation=True, flag_definition_cache_provider: Optional[FlagDefinitionCacheProvider] = None, capture_exception_code_variables=False, code_variables_mask_patterns=None, code_variables_ignore_patterns=None, code_variables_mask_url_credentials=None, code_variables_detect_secrets=None, in_app_modules: list[str] | None = None, enable_exception_autocapture_rate_limiting=False, exception_autocapture_bucket_size=ExceptionCapture.DEFAULT_BUCKET_SIZE, exception_autocapture_refill_rate=ExceptionCapture.DEFAULT_REFILL_RATE, exception_autocapture_refill_interval_seconds=ExceptionCapture.DEFAULT_REFILL_INTERVAL_SECONDS, _dedicated_ai_endpoint=False) -class posthog.consumer.Consumer(queue, api_key, flush_at=100, host=None, on_error=None, flush_interval=5.0, gzip=False, retries=10, timeout=15, historical_migration=False, dedicated_ai_endpoint=False) +class posthog.capture_mode.CaptureMode +class posthog.client.Client(project_api_key: str, host=None, debug=False, max_queue_size=10000, send=True, on_error=None, flush_at=100, flush_interval=5.0, gzip=False, max_retries=3, sync_mode=False, timeout=15, thread=1, poll_interval=30, personal_api_key=None, disabled=False, disable_geoip=True, is_server=True, historical_migration=False, feature_flags_request_timeout_seconds=3, super_properties=None, enable_exception_autocapture=False, log_captured_exceptions=False, project_root=None, privacy_mode=False, before_send=None, flag_fallback_cache_url=None, enable_local_evaluation=True, flag_definition_cache_provider: Optional[FlagDefinitionCacheProvider] = None, capture_exception_code_variables=False, code_variables_mask_patterns=None, code_variables_ignore_patterns=None, code_variables_mask_url_credentials=None, code_variables_detect_secrets=None, in_app_modules: list[str] | None = None, enable_exception_autocapture_rate_limiting=False, exception_autocapture_bucket_size=ExceptionCapture.DEFAULT_BUCKET_SIZE, exception_autocapture_refill_rate=ExceptionCapture.DEFAULT_REFILL_RATE, exception_autocapture_refill_interval_seconds=ExceptionCapture.DEFAULT_REFILL_INTERVAL_SECONDS, capture_mode: Optional[Union[CaptureMode, str]] = None, _dedicated_ai_endpoint=False) +class posthog.consumer.Consumer(queue, api_key, flush_at=100, host=None, on_error=None, flush_interval=5.0, gzip=False, retries=10, timeout=15, historical_migration=False, dedicated_ai_endpoint=False, capture_mode=CaptureMode.V0) class posthog.contexts.ContextScope(parent=None, fresh: bool = False, capture_exceptions: bool = True, client: Optional[Client] = None) class posthog.exception_capture.ExceptionCapture(client: Client, rate_limiting_enabled=False, bucket_size=DEFAULT_BUCKET_SIZE, refill_rate=DEFAULT_REFILL_RATE, refill_interval_seconds=DEFAULT_REFILL_INTERVAL_SECONDS) class posthog.exception_utils.AnnotatedValue(value, metadata) @@ -949,6 +960,7 @@ function posthog.ai.utils.with_privacy_mode(ph_client: PostHogClient, privacy_mo function posthog.alias(previous_id: str, distinct_id: str, timestamp: Optional[datetime.datetime] = None, uuid: Optional[str] = None, disable_geoip: Optional[bool] = None) -> Optional[str] function posthog.capture(event: str, **kwargs: Unpack[OptionalCaptureArgs]) -> Optional[str] function posthog.capture_exception(exception: Optional[ExceptionArg] = None, **kwargs: Unpack[OptionalCaptureArgs]) -> Optional[str] +function posthog.capture_mode.resolve_capture_mode(capture_mode: Optional[Union[CaptureMode, str]] = None) -> CaptureMode function posthog.client.add_context_tags(properties) function posthog.client.get_identity_state(passed) -> tuple[str, bool] function posthog.client.no_throw(default_return=None) @@ -1296,6 +1308,7 @@ module posthog.ai.types module posthog.ai.utils module posthog.args module posthog.bucketed_rate_limiter +module posthog.capture_mode module posthog.client module posthog.consumer module posthog.contexts