Skip to content
49 changes: 43 additions & 6 deletions sentry_sdk/integrations/arq.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import sys

import sentry_sdk
from sentry_sdk.consts import OP, SPANSTATUS
from sentry_sdk.consts import OP, SPANDATA, SPANSTATUS
from sentry_sdk.integrations import DidNotEnable, Integration, _check_minimum_version
from sentry_sdk.integrations.logging import ignore_logger
from sentry_sdk.scope import should_send_default_pii
from sentry_sdk.traces import SegmentSource
from sentry_sdk.tracing import Transaction, TransactionSource
from sentry_sdk.tracing_utils import has_span_streaming_enabled

Check warning on line 10 in sentry_sdk/integrations/arq.py

View check run for this annotation

@sentry/warden / warden: code-review

`_capture_exception` never sets span status in streaming mode

In the streaming path, `scope.transaction` always returns `None` (and emits a `DeprecationWarning` on each access), so `_capture_exception` never sets the span status to `INTERNAL_ERROR` or `ABORTED` — job errors and retries silently keep status `ok`. In non-streaming mode the transaction is correctly marked `internal_error`/`aborted`, so this is a behavioral regression for the streaming segment span. Consider setting the status on `scope.streamed_span` when streaming is enabled.
from sentry_sdk.utils import (
SENSITIVE_DATA_SUBSTITUTE,
_register_control_flow_exception,
capture_internal_exceptions,
ensure_integration_enabled,
event_from_exception,
Expand Down Expand Up @@ -59,6 +62,8 @@
patch_run_job()
patch_create_worker()

_register_control_flow_exception(ARQ_CONTROL_FLOW_EXCEPTIONS) # type: ignore
Comment thread
alexander-alderman-webb marked this conversation as resolved.

ignore_logger("arq.worker")


Expand All @@ -69,10 +74,20 @@
async def _sentry_enqueue_job(
self: "ArqRedis", function: str, *args: "Any", **kwargs: "Any"
) -> "Optional[Job]":
integration = sentry_sdk.get_client().get_integration(ArqIntegration)
if integration is None:
client = sentry_sdk.get_client()
if client.get_integration(ArqIntegration) is None:
return await old_enqueue_job(self, function, *args, **kwargs)

if has_span_streaming_enabled(client.options):
with sentry_sdk.traces.start_span(
name=function,
attributes={
"sentry.op": OP.QUEUE_SUBMIT_ARQ,
"sentry.origin": ArqIntegration.origin,
},
):
return await old_enqueue_job(self, function, *args, **kwargs)

with sentry_sdk.start_span(
op=OP.QUEUE_SUBMIT_ARQ, name=function, origin=ArqIntegration.origin
):
Expand All @@ -86,14 +101,27 @@
old_run_job = Worker.run_job

async def _sentry_run_job(self: "Worker", job_id: str, score: int) -> None:
integration = sentry_sdk.get_client().get_integration(ArqIntegration)
if integration is None:
client = sentry_sdk.get_client()
if client.get_integration(ArqIntegration) is None:
return await old_run_job(self, job_id, score)

with sentry_sdk.isolation_scope() as scope:
scope._name = "arq"
scope.clear_breadcrumbs()

if has_span_streaming_enabled(client.options):
with sentry_sdk.traces.start_span(
name="unknown arq task",
attributes={
"sentry.op": OP.QUEUE_TASK_ARQ,
"sentry.origin": ArqIntegration.origin,
"sentry.span.source": SegmentSource.TASK,
Comment thread
alexander-alderman-webb marked this conversation as resolved.
SPANDATA.MESSAGING_MESSAGE_ID: job_id,
},
parent_span=None,
):
return await old_run_job(self, job_id, score)

Check warning on line 123 in sentry_sdk/integrations/arq.py

View check run for this annotation

@sentry/warden / warden: find-bugs

[VYF-94L] Span error status never set in streaming path when job throws an exception (additional location)

`_capture_exception` only sets error status via `scope.transaction.set_status(SPANSTATUS.INTERNAL_ERROR)`, which is skipped when `scope.transaction is None` — as it always is in the streaming path. Failed jobs will appear as successful spans.

transaction = Transaction(
name="unknown arq task",
status="ok",
Expand Down Expand Up @@ -163,10 +191,19 @@
async def _sentry_coroutine(
ctx: "Dict[Any, Any]", *args: "Any", **kwargs: "Any"
) -> "Any":
integration = sentry_sdk.get_client().get_integration(ArqIntegration)
Comment thread
alexander-alderman-webb marked this conversation as resolved.
client = sentry_sdk.get_client()
integration = client.get_integration(ArqIntegration)
if integration is None:
return await coroutine(ctx, *args, **kwargs)

if has_span_streaming_enabled(client.options):
scope = sentry_sdk.get_current_scope()
span = scope.streamed_span
if span is not None:
span.name = name

scope.set_transaction_name(name)

Check warning on line 205 in sentry_sdk/integrations/arq.py

View check run for this annotation

@sentry/warden / warden: find-bugs

Span error status never set in streaming path when job throws an exception

`_capture_exception` only sets error status via `scope.transaction.set_status(SPANSTATUS.INTERNAL_ERROR)`, which is skipped when `scope.transaction is None` — as it always is in the streaming path. Failed jobs will appear as successful spans.

sentry_sdk.get_isolation_scope().add_event_processor(
_make_event_processor({**ctx, "job_name": name}, *args, **kwargs)
)
Expand Down
Loading
Loading