Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 36 additions & 4 deletions posthog/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@

from posthog._async_utils import _BackgroundEventLoopRunner
from posthog.args import ID_TYPES, ExceptionArg, OptionalCaptureArgs, OptionalSetArgs
from posthog.capture_compression import (
CaptureCompression,
resolve_capture_compression,
)
from posthog.capture_mode import CaptureMode, resolve_capture_mode
from posthog.capture_v1 import send_v1_batch
from posthog.consumer import Consumer
from posthog.contexts import (
_get_current_context,
Expand Down Expand Up @@ -261,6 +266,7 @@ def __init__(
exception_autocapture_refill_rate=ExceptionCapture.DEFAULT_REFILL_RATE,
exception_autocapture_refill_interval_seconds=ExceptionCapture.DEFAULT_REFILL_INTERVAL_SECONDS,
capture_mode: Optional[Union[CaptureMode, str]] = None,
capture_compression: Optional[Union[CaptureCompression, str]] = None,
_dedicated_ai_endpoint=False,
):
"""
Expand Down Expand Up @@ -346,6 +352,11 @@ def __init__(
(or pass the string ``"v1"``) to opt into
``/i/v1/analytics/events``. When omitted, the
``POSTHOG_CAPTURE_MODE`` env var is consulted, then ``V0``.
capture_compression: Request-body compression for capture-v1 uploads
(ignored in V0, which uses ``gzip``). ``CaptureCompression.GZIP``
or ``DEFLATE`` (or the strings ``"gzip"``/``"deflate"``). When
omitted, the ``POSTHOG_CAPTURE_COMPRESSION`` env var is consulted,
then the legacy ``gzip`` flag, then no compression.

Examples:
```python
Expand Down Expand Up @@ -373,6 +384,7 @@ def __init__(
self._duplicate_client_registry_key: Optional[tuple[str, str]] = None
self.gzip = gzip
self.timeout = timeout
self.max_retries = max_retries
self._feature_flags: Optional[list[Any]] = (
None # private variable to store flags
)
Expand Down Expand Up @@ -402,6 +414,11 @@ def __init__(
# `/i/v1/analytics/events`). Resolved here so the env-var fallback is
# applied once; V0 is the default and keeps upgrades transparent.
self.capture_mode = resolve_capture_mode(capture_mode)
# v1-only request compression; falls back to the legacy `gzip` flag when
# neither the kwarg nor POSTHOG_CAPTURE_COMPRESSION is set.
self.capture_compression = resolve_capture_compression(
capture_compression, gzip_fallback=gzip
)
# Internal, not ready for use: routes `$ai_*` events to a dedicated
# capture-ai endpoint while the backend route + ingress roll out.
self._dedicated_ai_endpoint = _dedicated_ai_endpoint
Expand Down Expand Up @@ -508,6 +525,7 @@ def __init__(
historical_migration=historical_migration,
dedicated_ai_endpoint=self._dedicated_ai_endpoint,
capture_mode=self.capture_mode,
capture_compression=self.capture_compression,
)
self.consumers.append(consumer)

Expand Down Expand Up @@ -1530,6 +1548,7 @@ def _reinit_after_fork(self):
historical_migration=old.historical_migration,
dedicated_ai_endpoint=old.dedicated_ai_endpoint,
capture_mode=old.capture_mode,
capture_compression=old.capture_compression,
)
new_consumers.append(consumer)

Expand Down Expand Up @@ -1629,11 +1648,24 @@ def _enqueue(self, msg, disable_geoip):

if self.sync_mode:
self.log.debug("enqueued with blocking %s.", msg["event"])
path = (
AI_EVENTS_ENDPOINT
if self._dedicated_ai_endpoint and is_ai_event(msg.get("event"))
else EVENTS_ENDPOINT
is_dedicated_ai = self._dedicated_ai_endpoint and is_ai_event(
msg.get("event")
)
# Analytics events follow `capture_mode`; the dedicated AI endpoint
# has no v1 form and always uses the legacy submitter.
if not is_dedicated_ai and self.capture_mode == CaptureMode.V1:
send_v1_batch(
self.api_key,
self.host,
[msg],
compression=self.capture_compression,
timeout=self.timeout,
max_retries=self.max_retries,
historical_migration=self.historical_migration,
)
return sent_uuid

path = AI_EVENTS_ENDPOINT if is_dedicated_ai else EVENTS_ENDPOINT
batch_post(
self.api_key,
self.host,
Expand Down
39 changes: 33 additions & 6 deletions posthog/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
from threading import Thread

from posthog._logging import _configure_posthog_logging
from posthog.capture_compression import CaptureCompression
from posthog.capture_mode import CaptureMode
from posthog.capture_v1 import send_v1_batch
from posthog.request import (
AI_EVENTS_ENDPOINT,
EVENTS_ENDPOINT,
Expand Down Expand Up @@ -52,6 +54,7 @@ def __init__(
historical_migration=False,
dedicated_ai_endpoint=False,
capture_mode=CaptureMode.V0,
capture_compression=CaptureCompression.NONE,
):
"""Create a consumer thread."""
Thread.__init__(self)
Expand All @@ -66,6 +69,7 @@ def __init__(
self.gzip = gzip
self.dedicated_ai_endpoint = dedicated_ai_endpoint
self.capture_mode = capture_mode
self.capture_compression = capture_compression
# It's important to set running in the constructor: if we are asked to
# pause immediately after construction, we might set running to True in
# run() *after* we set it to False in pause... and keep running
Expand Down Expand Up @@ -154,9 +158,13 @@ def request(self, batch):
invokes `on_error`); a second is logged here so it isn't silently lost.
The batch was already dequeued in `upload()`, so unsent events are dropped
after retries, same as the single-endpoint path.

The analytics destination follows `capture_mode` (v1 -> partial-retry
submitter); the dedicated AI endpoint has no v1 form and always uses the
legacy submitter.
"""
if not self.dedicated_ai_endpoint:
self._send(batch, EVENTS_ENDPOINT)
self._send_analytics(batch)
return

ai_events: list[Any] = []
Expand All @@ -166,23 +174,42 @@ def request(self, batch):
target.append(item)

first_exc = None
for events, path in (
(analytics_events, EVENTS_ENDPOINT),
(ai_events, AI_EVENTS_ENDPOINT),
for events, label, sender in (
(analytics_events, "analytics", self._send_analytics),
(ai_events, "ai", self._send_ai),
):
if not events:
continue
try:
self._send(events, path)
sender(events)
except Exception as e:
if first_exc is None:
first_exc = e
else:
self.log.error("error uploading to %s: %s", path, e)
self.log.error("error uploading to %s: %s", label, e)
Comment on lines +177 to +189

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Dead label entry for analytics in the loop tuple

The "analytics" string in the analytics tuple is logically unreachable in the self.log.error branch. Because analytics is always the first iteration, first_exc is always None when it runs β€” so a failure there takes the first_exc = e branch, never self.log.error(..., label, e). That log path is only reachable on the second failing iteration, which is always AI. The "analytics" label is therefore dead code in the logging context, and the inconsistency with AI_EVENTS_ENDPOINT (a path constant) adds unnecessary noise. Consider using a consistent format for both labels (e.g. two plain strings, "analytics" and "AI") and accepting that only the second sender's label ever reaches the log.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!


if first_exc is not None:
raise first_exc

def _send_analytics(self, batch):
"""Submit analytics events via the wire protocol selected by `capture_mode`."""
if self.capture_mode == CaptureMode.V1:
send_v1_batch(
self.api_key,
self.host,
batch,
compression=self.capture_compression,
timeout=self.timeout,
max_retries=self.retries,
historical_migration=self.historical_migration,
)
return
self._send(batch, EVENTS_ENDPOINT)

def _send_ai(self, batch):
"""Submit `$ai_*` events to the dedicated legacy AI endpoint (no v1 form)."""
self._send(batch, AI_EVENTS_ENDPOINT)

def _send(self, batch, path):
"""Attempt to upload a single batch to `path`, retrying before raising an error"""

Expand Down
48 changes: 48 additions & 0 deletions posthog/test/test_capture_compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
CaptureCompression,
resolve_capture_compression,
)
from posthog.client import Client
from posthog.consumer import Consumer
from posthog.test.logging_helpers import capture_message_only_logs
from posthog.test.test_utils import TEST_API_KEY


class TestResolveCaptureCompression(unittest.TestCase):
Expand Down Expand Up @@ -98,3 +101,48 @@ def test_unrecognized_env_var_warns_and_uses_fallback(self) -> None:
CaptureCompression.GZIP,
)
self.assertIn("bogus", stream.getvalue())


class TestCaptureCompressionPlumbing(unittest.TestCase):
def setUp(self) -> None:
patcher = mock.patch.dict(os.environ, {}, clear=False)
patcher.start()
self.addCleanup(patcher.stop)
os.environ.pop(CAPTURE_COMPRESSION_ENV_VAR, None)

def test_client_defaults_to_none(self) -> None:
client = Client(TEST_API_KEY, sync_mode=True)
self.assertIs(client.capture_compression, CaptureCompression.NONE)

def test_client_gzip_flag_falls_back_to_gzip(self) -> None:
client = Client(TEST_API_KEY, sync_mode=True, gzip=True)
self.assertIs(client.capture_compression, CaptureCompression.GZIP)

@parameterized.expand(
[
("enum_deflate", CaptureCompression.DEFLATE, CaptureCompression.DEFLATE),
("str_gzip", "gzip", CaptureCompression.GZIP),
("str_none", "none", CaptureCompression.NONE),
]
)
def test_client_kwarg_overrides_gzip_flag(self, _name, kwarg, expected) -> None:
# Even with the legacy gzip flag on, the explicit kwarg wins.
client = Client(
TEST_API_KEY, sync_mode=True, gzip=True, capture_compression=kwarg
)
self.assertIs(client.capture_compression, expected)

def test_client_propagates_to_consumers(self) -> None:
client = Client(
TEST_API_KEY,
capture_compression=CaptureCompression.DEFLATE,
send=False,
thread=2,
)
self.assertEqual(len(client.consumers), 2)
for consumer in client.consumers:
self.assertIs(consumer.capture_compression, CaptureCompression.DEFLATE)

def test_consumer_defaults_to_none(self) -> None:
consumer = Consumer(None, TEST_API_KEY)
self.assertIs(consumer.capture_compression, CaptureCompression.NONE)
81 changes: 81 additions & 0 deletions posthog/test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from unittest import mock
from parameterized import parameterized

from posthog.capture_compression import CaptureCompression
from posthog.client import Client
from posthog.contexts import get_context_session_id, new_context, set_context_session
from posthog.request import APIError, GetResponse
Expand Down Expand Up @@ -3221,3 +3222,83 @@ def test_debug_flag_re_raises_exceptions(self, mock_enqueue):
with self.assertRaises(Exception) as cm:
method(*args, **kwargs)
self.assertEqual(str(cm.exception), "Expected error")


class TestClientSyncCaptureMode(unittest.TestCase):
"""Sync-mode `_enqueue` selects the analytics submitter by `capture_mode`;
the dedicated AI endpoint always uses the legacy submitter."""

def _client(self, **kwargs):
return Client(FAKE_TEST_API_KEY, sync_mode=True, **kwargs)

@parameterized.expand(
[
("v0", None, False),
("v1", "v1", True),
]
)
def test_capture_mode_selects_sync_submitter(self, _name, capture_mode, expects_v1):
kwargs = {"capture_mode": capture_mode} if capture_mode else {}
with (
mock.patch("posthog.client.batch_post") as mock_post,
mock.patch("posthog.client.send_v1_batch") as mock_v1,
):
self._client(**kwargs).capture("evt", distinct_id="d")
if expects_v1:
mock_post.assert_not_called()
mock_v1.assert_called_once()
sent_batch = mock_v1.call_args.args[2]
self.assertEqual(len(sent_batch), 1)
self.assertEqual(sent_batch[0]["event"], "evt")
else:
mock_v1.assert_not_called()
mock_post.assert_called_once()

def test_v1_sync_forwards_config_to_submitter(self):
with (
mock.patch("posthog.client.batch_post"),
mock.patch("posthog.client.send_v1_batch") as mock_v1,
):
self._client(
capture_mode="v1",
capture_compression=CaptureCompression.GZIP,
max_retries=4,
historical_migration=True,
).capture("evt", distinct_id="d")
kwargs = mock_v1.call_args.kwargs
self.assertEqual(kwargs["compression"], CaptureCompression.GZIP)
self.assertEqual(kwargs["max_retries"], 4)
self.assertEqual(kwargs["historical_migration"], True)

def test_v1_sync_gzip_flag_falls_back_to_gzip_compression(self):
# Legacy `gzip=True` with no explicit capture_compression -> GZIP on v1.
with (
mock.patch("posthog.client.batch_post"),
mock.patch("posthog.client.send_v1_batch") as mock_v1,
):
self._client(capture_mode="v1", gzip=True).capture("evt", distinct_id="d")
self.assertEqual(
mock_v1.call_args.kwargs["compression"], CaptureCompression.GZIP
)

def test_v1_sync_dedicated_ai_event_stays_legacy(self):
# $ai_* on the dedicated AI endpoint has no v1 form.
with (
mock.patch("posthog.client.batch_post") as mock_post,
mock.patch("posthog.client.send_v1_batch") as mock_v1,
):
client = self._client(capture_mode="v1", _dedicated_ai_endpoint=True)
client.capture("$ai_generation", distinct_id="d")
mock_v1.assert_not_called()
mock_post.assert_called_once()
self.assertEqual(mock_post.call_args.kwargs["path"], "/i/v0/ai/batch/")

def test_v1_sync_dedicated_ai_analytics_event_uses_v1(self):
with (
mock.patch("posthog.client.batch_post") as mock_post,
mock.patch("posthog.client.send_v1_batch") as mock_v1,
):
client = self._client(capture_mode="v1", _dedicated_ai_endpoint=True)
client.capture("regular_event", distinct_id="d")
mock_post.assert_not_called()
mock_v1.assert_called_once()
Loading
Loading