From 23c3afb3cac7ddf9f866877b54a1465bd786605d Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Fri, 5 Jun 2026 08:57:48 +0200 Subject: [PATCH 1/8] feat(arq): Support span streaming --- sentry_sdk/integrations/arq.py | 47 +++- tests/integrations/arq/test_arq.py | 409 +++++++++++++++++++++-------- 2 files changed, 346 insertions(+), 110 deletions(-) diff --git a/sentry_sdk/integrations/arq.py b/sentry_sdk/integrations/arq.py index a4851396dd..d92bcc25e3 100644 --- a/sentry_sdk/integrations/arq.py +++ b/sentry_sdk/integrations/arq.py @@ -1,11 +1,13 @@ import sys import sentry_sdk -from sentry_sdk.consts import OP, SPANSTATUS +from sentry_sdk.consts import OP, SPANDATA, SPANSTATUS from sentry_sdk.integrations import DidNotEnable, Integration, _check_minimum_version from sentry_sdk.integrations.logging import ignore_logger from sentry_sdk.scope import should_send_default_pii +from sentry_sdk.traces import SegmentSource, SpanStatus from sentry_sdk.tracing import Transaction, TransactionSource +from sentry_sdk.tracing_utils import has_span_streaming_enabled from sentry_sdk.utils import ( SENSITIVE_DATA_SUBSTITUTE, capture_internal_exceptions, @@ -69,10 +71,20 @@ def patch_enqueue_job() -> None: async def _sentry_enqueue_job( self: "ArqRedis", function: str, *args: "Any", **kwargs: "Any" ) -> "Optional[Job]": - integration = sentry_sdk.get_client().get_integration(ArqIntegration) - if integration is None: + client = sentry_sdk.get_client() + if client.get_integration(ArqIntegration) is None: return await old_enqueue_job(self, function, *args, **kwargs) + if has_span_streaming_enabled(client.options): + with sentry_sdk.traces.start_span( + name=function, + attributes={ + "sentry.op": OP.QUEUE_SUBMIT_ARQ, + "sentry.origin": ArqIntegration.origin, + }, + ): + return await old_enqueue_job(self, function, *args, **kwargs) + with sentry_sdk.start_span( op=OP.QUEUE_SUBMIT_ARQ, name=function, origin=ArqIntegration.origin ): @@ -86,14 +98,27 @@ def patch_run_job() -> None: old_run_job = Worker.run_job async def _sentry_run_job(self: "Worker", job_id: str, score: int) -> None: - integration = sentry_sdk.get_client().get_integration(ArqIntegration) - if integration is None: + client = sentry_sdk.get_client() + if client.get_integration(ArqIntegration) is None: return await old_run_job(self, job_id, score) with sentry_sdk.isolation_scope() as scope: scope._name = "arq" scope.clear_breadcrumbs() + if has_span_streaming_enabled(client.options): + with sentry_sdk.traces.start_span( + name="unknown arq task", + attributes={ + "sentry.op": OP.QUEUE_TASK_ARQ, + "sentry.origin": ArqIntegration.origin, + "sentry.span.source": SegmentSource.TASK, + SPANDATA.MESSAGING_MESSAGE_ID: job_id, + }, + parent_span=None, + ): + return await old_run_job(self, job_id, score) + transaction = Transaction( name="unknown arq task", status="ok", @@ -111,7 +136,13 @@ async def _sentry_run_job(self: "Worker", job_id: str, score: int) -> None: def _capture_exception(exc_info: "ExcInfo") -> None: scope = sentry_sdk.get_current_scope() - if scope.transaction is not None: + span_streaming = has_span_streaming_enabled(sentry_sdk.get_client().options) + if span_streaming and scope.streamed_span is not None: + if exc_info[0] in ARQ_CONTROL_FLOW_EXCEPTIONS: + return + + scope.streamed_span.status = SpanStatus.ERROR + elif not span_streaming and scope.transaction is not None: if exc_info[0] in ARQ_CONTROL_FLOW_EXCEPTIONS: scope.transaction.set_status(SPANSTATUS.ABORTED) return @@ -164,6 +195,10 @@ async def _sentry_coroutine( if integration is None: return await coroutine(ctx, *args, **kwargs) + span = sentry_sdk.traces.get_current_span() + if span is not None: + span.name = name + sentry_sdk.get_isolation_scope().add_event_processor( _make_event_processor({**ctx, "job_name": name}, *args, **kwargs) ) diff --git a/tests/integrations/arq/test_arq.py b/tests/integrations/arq/test_arq.py index 49dd48f85e..40526a88d7 100644 --- a/tests/integrations/arq/test_arq.py +++ b/tests/integrations/arq/test_arq.py @@ -9,6 +9,7 @@ from arq.utils import timestamp_ms from fakeredis.aioredis import FakeRedis +import sentry_sdk from sentry_sdk import get_client, start_transaction from sentry_sdk.integrations.arq import ArqIntegration @@ -41,6 +42,7 @@ def info(self, section): @pytest.fixture def init_arq(sentry_init): def inner( + span_streaming, cls_functions=None, cls_cron_jobs=None, kw_functions=None, @@ -60,6 +62,7 @@ def inner( integrations=[ArqIntegration()], traces_sample_rate=1.0, send_default_pii=True, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) server = FakeRedis() @@ -86,6 +89,7 @@ class WorkerSettings: @pytest.fixture def init_arq_with_dict_settings(sentry_init): def inner( + span_streaming, cls_functions=None, cls_cron_jobs=None, kw_functions=None, @@ -105,6 +109,7 @@ def inner( integrations=[ArqIntegration()], traces_sample_rate=1.0, send_default_pii=True, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) server = FakeRedis() @@ -134,6 +139,7 @@ def init_arq_with_kwarg_settings(sentry_init): """Test fixture that passes settings_cls as keyword argument only.""" def inner( + span_streaming, cls_functions=None, cls_cron_jobs=None, kw_functions=None, @@ -153,6 +159,7 @@ def inner( integrations=[ArqIntegration()], traces_sample_rate=1.0, send_default_pii=True, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) server = FakeRedis() @@ -182,7 +189,8 @@ class WorkerSettings: "init_arq_settings", ["init_arq", "init_arq_with_dict_settings", "init_arq_with_kwarg_settings"], ) -async def test_job_result(init_arq_settings, request): +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_job_result(init_arq_settings, request, span_streaming): async def increase(ctx, num): return num + 1 @@ -190,7 +198,7 @@ async def increase(ctx, num): increase.__qualname__ = increase.__name__ - pool, worker = init_fixture_method([increase]) + pool, worker = init_fixture_method(span_streaming, [increase]) job = await pool.enqueue_job("increase", 3) @@ -208,7 +216,14 @@ async def increase(ctx, num): @pytest.mark.parametrize( "init_arq_settings", ["init_arq", "init_arq_with_dict_settings"] ) -async def test_job_retry(capture_events, init_arq_settings, request): +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_job_retry( + capture_events, + capture_items, + init_arq_settings, + request, + span_streaming, +): async def retry_job(ctx): if ctx["job_try"] < 2: raise arq.worker.Retry @@ -217,27 +232,48 @@ async def retry_job(ctx): retry_job.__qualname__ = retry_job.__name__ - pool, worker = init_fixture_method([retry_job]) + pool, worker = init_fixture_method(span_streaming, [retry_job]) job = await pool.enqueue_job("retry_job") - events = capture_events() + if span_streaming: + items = capture_items("event", "transaction", "span") - await worker.run_job(job.job_id, timestamp_ms()) + await worker.run_job(job.job_id, timestamp_ms()) - event = events.pop(0) - assert event["contexts"]["trace"]["status"] == "aborted" - assert event["transaction"] == "retry_job" - assert event["tags"]["arq_task_id"] == job.job_id - assert event["extra"]["arq-job"]["retry"] == 1 + sentry_sdk.flush() + spans = [item.payload for item in items if item.type == "span"] - await worker.run_job(job.job_id, timestamp_ms()) + assert spans[4]["attributes"]["sentry.op"] == "queue.task.arq" + assert spans[4]["status"] == "ok" + assert spans[4]["name"] == "retry_job" + + await worker.run_job(job.job_id, timestamp_ms()) + + sentry_sdk.flush() + spans = [item.payload for item in items if item.type == "span"] + + assert spans[7]["attributes"]["sentry.op"] == "queue.task.arq" + assert spans[7]["status"] == "ok" + assert spans[7]["name"] == "retry_job" + else: + events = capture_events() + + await worker.run_job(job.job_id, timestamp_ms()) - event = events.pop(0) - assert event["contexts"]["trace"]["status"] == "ok" - assert event["transaction"] == "retry_job" - assert event["tags"]["arq_task_id"] == job.job_id - assert event["extra"]["arq-job"]["retry"] == 2 + event = events.pop(0) + assert event["contexts"]["trace"]["status"] == "aborted" + assert event["transaction"] == "retry_job" + assert event["tags"]["arq_task_id"] == job.job_id + assert event["extra"]["arq-job"]["retry"] == 1 + + await worker.run_job(job.job_id, timestamp_ms()) + + event = events.pop(0) + assert event["contexts"]["trace"]["status"] == "ok" + assert event["transaction"] == "retry_job" + assert event["tags"]["arq_task_id"] == job.job_id + assert event["extra"]["arq-job"]["retry"] == 2 @pytest.mark.parametrize( @@ -248,8 +284,15 @@ async def retry_job(ctx): "init_arq_settings", ["init_arq", "init_arq_with_dict_settings"] ) @pytest.mark.asyncio +@pytest.mark.parametrize("span_streaming", [True, False]) async def test_job_transaction( - capture_events, init_arq_settings, source, job_fails, request + capture_events, + capture_items, + init_arq_settings, + source, + job_fails, + request, + span_streaming, ): async def division(_, a, b=0): return a / b @@ -265,65 +308,126 @@ async def division(_, a, b=0): functions_key, cron_jobs_key = source pool, worker = init_fixture_method( - **{functions_key: [division], cron_jobs_key: [cron_job]} + span_streaming, **{functions_key: [division], cron_jobs_key: [cron_job]} ) - events = capture_events() + if span_streaming: + items = capture_items("event", "transaction", "span") - job = await pool.enqueue_job("division", 1, b=int(not job_fails)) - await worker.run_job(job.job_id, timestamp_ms()) + job = await pool.enqueue_job("division", 1, b=int(not job_fails)) + await worker.run_job(job.job_id, timestamp_ms()) - loop = asyncio.get_event_loop() - task = loop.create_task(worker.async_run()) - await asyncio.sleep(1) + loop = asyncio.get_event_loop() + task = loop.create_task(worker.async_run()) + await asyncio.sleep(1) - task.cancel() + task.cancel() - await worker.close() + await worker.close() - if job_fails: - error_func_event = events.pop(0) - error_cron_event = events.pop(1) + events = [item.payload for item in items if item.type == "event"] + if job_fails: + error_func_event = events.pop(0) + error_cron_event = events.pop(0) - assert error_func_event["exception"]["values"][0]["type"] == "ZeroDivisionError" - assert error_func_event["exception"]["values"][0]["mechanism"]["type"] == "arq" + assert ( + error_func_event["exception"]["values"][0]["type"] + == "ZeroDivisionError" + ) + assert ( + error_func_event["exception"]["values"][0]["mechanism"]["type"] == "arq" + ) - func_extra = error_func_event["extra"]["arq-job"] - assert func_extra["task"] == "division" + func_extra = error_func_event["extra"]["arq-job"] + assert func_extra["task"] == "division" - assert error_cron_event["exception"]["values"][0]["type"] == "ZeroDivisionError" - assert error_cron_event["exception"]["values"][0]["mechanism"]["type"] == "arq" + assert ( + error_cron_event["exception"]["values"][0]["type"] + == "ZeroDivisionError" + ) + assert ( + error_cron_event["exception"]["values"][0]["mechanism"]["type"] == "arq" + ) - cron_extra = error_cron_event["extra"]["arq-job"] - assert cron_extra["task"] == "cron:division" + cron_extra = error_cron_event["extra"]["arq-job"] + assert cron_extra["task"] == "cron:division" + + sentry_sdk.flush() + spans = [item.payload for item in items if item.type == "span"] + + assert spans[4]["attributes"]["sentry.op"] == "queue.task.arq" + assert spans[4]["name"] == "division" + assert spans[4]["attributes"]["sentry.span.source"] == "task" - [func_event, cron_event] = events + assert spans[15]["attributes"]["sentry.op"] == "queue.task.arq" + assert spans[15]["name"] == "cron:division" + else: + events = capture_events() - assert func_event["type"] == "transaction" - assert func_event["transaction"] == "division" - assert func_event["transaction_info"] == {"source": "task"} + job = await pool.enqueue_job("division", 1, b=int(not job_fails)) + await worker.run_job(job.job_id, timestamp_ms()) - assert "arq_task_id" in func_event["tags"] - assert "arq_task_retry" in func_event["tags"] + loop = asyncio.get_event_loop() + task = loop.create_task(worker.async_run()) + await asyncio.sleep(1) - func_extra = func_event["extra"]["arq-job"] + task.cancel() - assert func_extra["task"] == "division" - assert func_extra["kwargs"] == {"b": int(not job_fails)} - assert func_extra["retry"] == 1 + await worker.close() - assert cron_event["type"] == "transaction" - assert cron_event["transaction"] == "cron:division" - assert cron_event["transaction_info"] == {"source": "task"} + if job_fails: + error_func_event = events.pop(0) + error_cron_event = events.pop(1) - assert "arq_task_id" in cron_event["tags"] - assert "arq_task_retry" in cron_event["tags"] + assert ( + error_func_event["exception"]["values"][0]["type"] + == "ZeroDivisionError" + ) + assert ( + error_func_event["exception"]["values"][0]["mechanism"]["type"] == "arq" + ) - cron_extra = cron_event["extra"]["arq-job"] + func_extra = error_func_event["extra"]["arq-job"] + assert func_extra["task"] == "division" - assert cron_extra["task"] == "cron:division" - assert cron_extra["kwargs"] == {} - assert cron_extra["retry"] == 1 + assert ( + error_cron_event["exception"]["values"][0]["type"] + == "ZeroDivisionError" + ) + assert ( + error_cron_event["exception"]["values"][0]["mechanism"]["type"] == "arq" + ) + + cron_extra = error_cron_event["extra"]["arq-job"] + assert cron_extra["task"] == "cron:division" + + [func_event, cron_event] = events + + assert func_event["type"] == "transaction" + assert func_event["transaction"] == "division" + assert func_event["transaction_info"] == {"source": "task"} + + assert "arq_task_id" in func_event["tags"] + assert "arq_task_retry" in func_event["tags"] + + func_extra = func_event["extra"]["arq-job"] + + assert func_extra["task"] == "division" + assert func_extra["kwargs"] == {"b": int(not job_fails)} + assert func_extra["retry"] == 1 + + assert cron_event["type"] == "transaction" + assert cron_event["transaction"] == "cron:division" + assert cron_event["transaction_info"] == {"source": "task"} + + assert "arq_task_id" in cron_event["tags"] + assert "arq_task_retry" in cron_event["tags"] + + cron_extra = cron_event["extra"]["arq-job"] + + assert cron_extra["task"] == "cron:division" + assert cron_extra["kwargs"] == {} + assert cron_extra["retry"] == 1 @pytest.mark.parametrize("source", ["cls_functions", "kw_functions"]) @@ -331,34 +435,61 @@ async def division(_, a, b=0): "init_arq_settings", ["init_arq", "init_arq_with_dict_settings"] ) @pytest.mark.asyncio -async def test_enqueue_job(capture_events, init_arq_settings, source, request): +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_enqueue_job( + capture_events, + capture_items, + init_arq_settings, + source, + request, + span_streaming, +): async def dummy_job(_): pass init_fixture_method = request.getfixturevalue(init_arq_settings) - pool, _ = init_fixture_method(**{source: [dummy_job]}) + pool, _ = init_fixture_method(span_streaming, **{source: [dummy_job]}) + + if span_streaming: + items = capture_items("event", "transaction", "span") - events = capture_events() + with sentry_sdk.traces.start_span(name="custom parent") as span: + await pool.enqueue_job("dummy_job") - with start_transaction() as transaction: - await pool.enqueue_job("dummy_job") + sentry_sdk.flush() + spans = [item.payload for item in items if item.type == "span"] - (event,) = events + assert spans[2]["is_segment"] is True + assert spans[2]["trace_id"] == span.trace_id + assert spans[2]["span_id"] == span.span_id - assert event["contexts"]["trace"]["trace_id"] == transaction.trace_id - assert event["contexts"]["trace"]["span_id"] == transaction.span_id + assert spans[1]["attributes"]["sentry.op"] == "queue.submit.arq" + assert spans[1]["name"] == "dummy_job" + else: + events = capture_events() - assert len(event["spans"]) - assert event["spans"][0]["op"] == "queue.submit.arq" - assert event["spans"][0]["description"] == "dummy_job" + with start_transaction() as transaction: + await pool.enqueue_job("dummy_job") + + (event,) = events + + assert event["contexts"]["trace"]["trace_id"] == transaction.trace_id + assert event["contexts"]["trace"]["span_id"] == transaction.span_id + + assert len(event["spans"]) + assert event["spans"][0]["op"] == "queue.submit.arq" + assert event["spans"][0]["description"] == "dummy_job" @pytest.mark.asyncio @pytest.mark.parametrize( "init_arq_settings", ["init_arq", "init_arq_with_dict_settings"] ) -async def test_execute_job_without_integration(init_arq_settings, request): +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_execute_job_without_integration( + init_arq_settings, request, span_streaming +): async def dummy_job(_ctx): pass @@ -366,7 +497,7 @@ async def dummy_job(_ctx): dummy_job.__qualname__ = dummy_job.__name__ - pool, worker = init_fixture_method([dummy_job]) + pool, worker = init_fixture_method(span_streaming, [dummy_job]) # remove the integration to trigger the edge case get_client().integrations.pop("arq") @@ -382,29 +513,55 @@ async def dummy_job(_ctx): "init_arq_settings", ["init_arq", "init_arq_with_dict_settings"] ) @pytest.mark.asyncio -async def test_span_origin_producer(capture_events, init_arq_settings, source, request): +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_span_origin_producer( + capture_events, + capture_items, + init_arq_settings, + source, + request, + span_streaming, +): async def dummy_job(_): pass init_fixture_method = request.getfixturevalue(init_arq_settings) - pool, _ = init_fixture_method(**{source: [dummy_job]}) + pool, _ = init_fixture_method(span_streaming, **{source: [dummy_job]}) + + if span_streaming: + items = capture_items("event", "transaction", "span") + + with sentry_sdk.traces.start_span(name="custom parent"): + await pool.enqueue_job("dummy_job") - events = capture_events() + sentry_sdk.flush() + spans = [item.payload for item in items if item.type == "span"] + assert spans[2]["attributes"]["sentry.origin"] == "manual" + assert spans[1]["attributes"]["sentry.origin"] == "auto.queue.arq" + else: + events = capture_events() - with start_transaction(): - await pool.enqueue_job("dummy_job") + with start_transaction(): + await pool.enqueue_job("dummy_job") - (event,) = events - assert event["contexts"]["trace"]["origin"] == "manual" - assert event["spans"][0]["origin"] == "auto.queue.arq" + (event,) = events + assert event["contexts"]["trace"]["origin"] == "manual" + assert event["spans"][0]["origin"] == "auto.queue.arq" @pytest.mark.asyncio @pytest.mark.parametrize( "init_arq_settings", ["init_arq", "init_arq_with_dict_settings"] ) -async def test_span_origin_consumer(capture_events, init_arq_settings, request): +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_span_origin_consumer( + capture_events, + capture_items, + init_arq_settings, + request, + span_streaming, +): async def job(ctx): pass @@ -412,23 +569,44 @@ async def job(ctx): job.__qualname__ = job.__name__ - pool, worker = init_fixture_method([job]) + pool, worker = init_fixture_method(span_streaming, [job]) - job = await pool.enqueue_job("retry_job") + if span_streaming: + job = await pool.enqueue_job("retry_job") - events = capture_events() + items = capture_items("event", "transaction", "span") - await worker.run_job(job.job_id, timestamp_ms()) + await worker.run_job(job.job_id, timestamp_ms()) + + sentry_sdk.flush() + spans = [item.payload for item in items if item.type == "span"] + + assert spans[4]["attributes"]["sentry.op"] == "queue.task.arq" + assert spans[4]["attributes"]["sentry.origin"] == "auto.queue.arq" + assert spans[3]["attributes"]["sentry.origin"] == "auto.db.redis" + assert spans[2]["attributes"]["sentry.origin"] == "auto.db.redis" + else: + job = await pool.enqueue_job("retry_job") + + events = capture_events() + + await worker.run_job(job.job_id, timestamp_ms()) - (event,) = events + (event,) = events - assert event["contexts"]["trace"]["origin"] == "auto.queue.arq" - assert event["spans"][0]["origin"] == "auto.db.redis" - assert event["spans"][1]["origin"] == "auto.db.redis" + assert event["contexts"]["trace"]["origin"] == "auto.queue.arq" + assert event["spans"][0]["origin"] == "auto.db.redis" + assert event["spans"][1]["origin"] == "auto.db.redis" @pytest.mark.asyncio -async def test_job_concurrency(capture_events, init_arq): +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_job_concurrency( + capture_events, + capture_items, + init_arq, + span_streaming, +): """ 10 - division starts 70 - sleepy starts @@ -447,26 +625,49 @@ async def division(_): sleepy.__qualname__ = sleepy.__name__ division.__qualname__ = division.__name__ - pool, worker = init_arq([sleepy, division]) + pool, worker = init_arq(span_streaming, [sleepy, division]) - events = capture_events() + if span_streaming: + items = capture_items("event", "transaction", "span") - await pool.enqueue_job( - "division", _job_id="123", _defer_by=timedelta(milliseconds=10) - ) - await pool.enqueue_job( - "sleepy", _job_id="456", _defer_by=timedelta(milliseconds=70) - ) + await pool.enqueue_job( + "division", _job_id="123", _defer_by=timedelta(milliseconds=10) + ) + await pool.enqueue_job( + "sleepy", _job_id="456", _defer_by=timedelta(milliseconds=70) + ) + + loop = asyncio.get_event_loop() + task = loop.create_task(worker.async_run()) + await asyncio.sleep(1) + + task.cancel() + + await worker.close() + + events = [item.payload for item in items if item.type == "event"] + exception_event = events[0] + assert exception_event["exception"]["values"][0]["type"] == "ZeroDivisionError" + else: + events = capture_events() + + await pool.enqueue_job( + "division", _job_id="123", _defer_by=timedelta(milliseconds=10) + ) + await pool.enqueue_job( + "sleepy", _job_id="456", _defer_by=timedelta(milliseconds=70) + ) + + loop = asyncio.get_event_loop() + task = loop.create_task(worker.async_run()) + await asyncio.sleep(1) - loop = asyncio.get_event_loop() - task = loop.create_task(worker.async_run()) - await asyncio.sleep(1) + task.cancel() - task.cancel() + await worker.close() - await worker.close() + exception_event = events[1] + assert exception_event["exception"]["values"][0]["type"] == "ZeroDivisionError" + assert exception_event["transaction"] == "division" - exception_event = events[1] - assert exception_event["exception"]["values"][0]["type"] == "ZeroDivisionError" - assert exception_event["transaction"] == "division" assert exception_event["extra"]["arq-job"]["task"] == "division" From 56b5d2a789841e86e6c581ea72656e02dfe5b02d Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Fri, 5 Jun 2026 09:01:53 +0200 Subject: [PATCH 2/8] simplify tests --- tests/integrations/arq/test_arq.py | 49 +++++++++++++----------------- 1 file changed, 21 insertions(+), 28 deletions(-) diff --git a/tests/integrations/arq/test_arq.py b/tests/integrations/arq/test_arq.py index 40526a88d7..660d173052 100644 --- a/tests/integrations/arq/test_arq.py +++ b/tests/integrations/arq/test_arq.py @@ -237,12 +237,12 @@ async def retry_job(ctx): job = await pool.enqueue_job("retry_job") if span_streaming: - items = capture_items("event", "transaction", "span") + items = capture_items("span") await worker.run_job(job.job_id, timestamp_ms()) sentry_sdk.flush() - spans = [item.payload for item in items if item.type == "span"] + spans = [item.payload for item in items] assert spans[4]["attributes"]["sentry.op"] == "queue.task.arq" assert spans[4]["status"] == "ok" @@ -251,7 +251,7 @@ async def retry_job(ctx): await worker.run_job(job.job_id, timestamp_ms()) sentry_sdk.flush() - spans = [item.payload for item in items if item.type == "span"] + spans = [item.payload for item in items] assert spans[7]["attributes"]["sentry.op"] == "queue.task.arq" assert spans[7]["status"] == "ok" @@ -311,10 +311,11 @@ async def division(_, a, b=0): span_streaming, **{functions_key: [division], cron_jobs_key: [cron_job]} ) + job = await pool.enqueue_job("division", 1, b=int(not job_fails)) + if span_streaming: - items = capture_items("event", "transaction", "span") + items = capture_items("event", "span") - job = await pool.enqueue_job("division", 1, b=int(not job_fails)) await worker.run_job(job.job_id, timestamp_ms()) loop = asyncio.get_event_loop() @@ -364,7 +365,6 @@ async def division(_, a, b=0): else: events = capture_events() - job = await pool.enqueue_job("division", 1, b=int(not job_fails)) await worker.run_job(job.job_id, timestamp_ms()) loop = asyncio.get_event_loop() @@ -452,13 +452,13 @@ async def dummy_job(_): pool, _ = init_fixture_method(span_streaming, **{source: [dummy_job]}) if span_streaming: - items = capture_items("event", "transaction", "span") + items = capture_items("span") with sentry_sdk.traces.start_span(name="custom parent") as span: await pool.enqueue_job("dummy_job") sentry_sdk.flush() - spans = [item.payload for item in items if item.type == "span"] + spans = [item.payload for item in items] assert spans[2]["is_segment"] is True assert spans[2]["trace_id"] == span.trace_id @@ -530,13 +530,13 @@ async def dummy_job(_): pool, _ = init_fixture_method(span_streaming, **{source: [dummy_job]}) if span_streaming: - items = capture_items("event", "transaction", "span") + items = capture_items("span") with sentry_sdk.traces.start_span(name="custom parent"): await pool.enqueue_job("dummy_job") sentry_sdk.flush() - spans = [item.payload for item in items if item.type == "span"] + spans = [item.payload for item in items] assert spans[2]["attributes"]["sentry.origin"] == "manual" assert spans[1]["attributes"]["sentry.origin"] == "auto.queue.arq" else: @@ -574,12 +574,12 @@ async def job(ctx): if span_streaming: job = await pool.enqueue_job("retry_job") - items = capture_items("event", "transaction", "span") + items = capture_items("span") await worker.run_job(job.job_id, timestamp_ms()) sentry_sdk.flush() - spans = [item.payload for item in items if item.type == "span"] + spans = [item.payload for item in items] assert spans[4]["attributes"]["sentry.op"] == "queue.task.arq" assert spans[4]["attributes"]["sentry.origin"] == "auto.queue.arq" @@ -627,15 +627,15 @@ async def division(_): pool, worker = init_arq(span_streaming, [sleepy, division]) - if span_streaming: - items = capture_items("event", "transaction", "span") + await pool.enqueue_job( + "division", _job_id="123", _defer_by=timedelta(milliseconds=10) + ) + await pool.enqueue_job( + "sleepy", _job_id="456", _defer_by=timedelta(milliseconds=70) + ) - await pool.enqueue_job( - "division", _job_id="123", _defer_by=timedelta(milliseconds=10) - ) - await pool.enqueue_job( - "sleepy", _job_id="456", _defer_by=timedelta(milliseconds=70) - ) + if span_streaming: + items = capture_items("event") loop = asyncio.get_event_loop() task = loop.create_task(worker.async_run()) @@ -645,19 +645,12 @@ async def division(_): await worker.close() - events = [item.payload for item in items if item.type == "event"] + events = [item.payload for item in items] exception_event = events[0] assert exception_event["exception"]["values"][0]["type"] == "ZeroDivisionError" else: events = capture_events() - await pool.enqueue_job( - "division", _job_id="123", _defer_by=timedelta(milliseconds=10) - ) - await pool.enqueue_job( - "sleepy", _job_id="456", _defer_by=timedelta(milliseconds=70) - ) - loop = asyncio.get_event_loop() task = loop.create_task(worker.async_run()) await asyncio.sleep(1) From 0f1d9b098b88964bb5354bef1839294e7442247e Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Fri, 5 Jun 2026 09:30:00 +0200 Subject: [PATCH 3/8] add assertion --- tests/integrations/arq/test_arq.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integrations/arq/test_arq.py b/tests/integrations/arq/test_arq.py index 660d173052..d18e90d4eb 100644 --- a/tests/integrations/arq/test_arq.py +++ b/tests/integrations/arq/test_arq.py @@ -648,6 +648,7 @@ async def division(_): events = [item.payload for item in items] exception_event = events[0] assert exception_event["exception"]["values"][0]["type"] == "ZeroDivisionError" + assert exception_event["transaction"] == "division" else: events = capture_events() From ac4f7ba48228bb35b24480b312027c816c3ba535 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Fri, 5 Jun 2026 09:37:15 +0200 Subject: [PATCH 4/8] fix tests --- sentry_sdk/integrations/arq.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/sentry_sdk/integrations/arq.py b/sentry_sdk/integrations/arq.py index d92bcc25e3..0ba87896b9 100644 --- a/sentry_sdk/integrations/arq.py +++ b/sentry_sdk/integrations/arq.py @@ -191,13 +191,18 @@ def _wrap_coroutine(name: str, coroutine: "WorkerCoroutine") -> "WorkerCoroutine async def _sentry_coroutine( ctx: "Dict[Any, Any]", *args: "Any", **kwargs: "Any" ) -> "Any": - integration = sentry_sdk.get_client().get_integration(ArqIntegration) + client = sentry_sdk.get_client() + integration = client.get_integration(ArqIntegration) if integration is None: return await coroutine(ctx, *args, **kwargs) - span = sentry_sdk.traces.get_current_span() - if span is not None: - span.name = name + if has_span_streaming_enabled(client.options): + span = sentry_sdk.traces.get_current_span() + if span is not None: + span.name = name + + scope = sentry_sdk.get_current_scope() + scope.set_transaction_name(name) sentry_sdk.get_isolation_scope().add_event_processor( _make_event_processor({**ctx, "job_name": name}, *args, **kwargs) From 70a0e3fd9432dc5bcd055ad6c9c0eb108115d2c8 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Fri, 5 Jun 2026 09:38:58 +0200 Subject: [PATCH 5/8] simplify --- sentry_sdk/integrations/arq.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/integrations/arq.py b/sentry_sdk/integrations/arq.py index 0ba87896b9..99ee327c16 100644 --- a/sentry_sdk/integrations/arq.py +++ b/sentry_sdk/integrations/arq.py @@ -197,11 +197,11 @@ async def _sentry_coroutine( return await coroutine(ctx, *args, **kwargs) if has_span_streaming_enabled(client.options): - span = sentry_sdk.traces.get_current_span() + scope = sentry_sdk.get_current_scope() + span = scope.streamed_span if span is not None: span.name = name - scope = sentry_sdk.get_current_scope() scope.set_transaction_name(name) sentry_sdk.get_isolation_scope().add_event_processor( From 3174afefad72019d103baf5314967b5506b8d713 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Fri, 5 Jun 2026 09:51:48 +0200 Subject: [PATCH 6/8] do not manually set error --- sentry_sdk/integrations/arq.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/sentry_sdk/integrations/arq.py b/sentry_sdk/integrations/arq.py index 99ee327c16..012a343e68 100644 --- a/sentry_sdk/integrations/arq.py +++ b/sentry_sdk/integrations/arq.py @@ -5,11 +5,12 @@ from sentry_sdk.integrations import DidNotEnable, Integration, _check_minimum_version from sentry_sdk.integrations.logging import ignore_logger from sentry_sdk.scope import should_send_default_pii -from sentry_sdk.traces import SegmentSource, SpanStatus +from sentry_sdk.traces import SegmentSource from sentry_sdk.tracing import Transaction, TransactionSource from sentry_sdk.tracing_utils import has_span_streaming_enabled from sentry_sdk.utils import ( SENSITIVE_DATA_SUBSTITUTE, + _register_control_flow_exception, capture_internal_exceptions, ensure_integration_enabled, event_from_exception, @@ -61,6 +62,8 @@ def setup_once() -> None: patch_run_job() patch_create_worker() + _register_control_flow_exception(ARQ_CONTROL_FLOW_EXCEPTIONS) + ignore_logger("arq.worker") @@ -136,13 +139,7 @@ async def _sentry_run_job(self: "Worker", job_id: str, score: int) -> None: def _capture_exception(exc_info: "ExcInfo") -> None: scope = sentry_sdk.get_current_scope() - span_streaming = has_span_streaming_enabled(sentry_sdk.get_client().options) - if span_streaming and scope.streamed_span is not None: - if exc_info[0] in ARQ_CONTROL_FLOW_EXCEPTIONS: - return - - scope.streamed_span.status = SpanStatus.ERROR - elif not span_streaming and scope.transaction is not None: + if scope.transaction is not None: if exc_info[0] in ARQ_CONTROL_FLOW_EXCEPTIONS: scope.transaction.set_status(SPANSTATUS.ABORTED) return From d14aa205f4a05896170649e205d5de1d4acc13ad Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Fri, 5 Jun 2026 09:53:16 +0200 Subject: [PATCH 7/8] . --- sentry_sdk/integrations/arq.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sentry_sdk/integrations/arq.py b/sentry_sdk/integrations/arq.py index 012a343e68..e479afa060 100644 --- a/sentry_sdk/integrations/arq.py +++ b/sentry_sdk/integrations/arq.py @@ -62,7 +62,7 @@ def setup_once() -> None: patch_run_job() patch_create_worker() - _register_control_flow_exception(ARQ_CONTROL_FLOW_EXCEPTIONS) + _register_control_flow_exception(ARQ_CONTROL_FLOW_EXCEPTIONS) # type: ignore ignore_logger("arq.worker") From 30ff4da4e7fbce83b7b1a13169c122c4db7e0c78 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Fri, 5 Jun 2026 10:06:30 +0200 Subject: [PATCH 8/8] fix(arq): Never capture control flow exceptions --- sentry_sdk/integrations/arq.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sentry_sdk/integrations/arq.py b/sentry_sdk/integrations/arq.py index a4851396dd..bbdb74bc8a 100644 --- a/sentry_sdk/integrations/arq.py +++ b/sentry_sdk/integrations/arq.py @@ -118,6 +118,9 @@ def _capture_exception(exc_info: "ExcInfo") -> None: scope.transaction.set_status(SPANSTATUS.INTERNAL_ERROR) + if exc_info[0] in ARQ_CONTROL_FLOW_EXCEPTIONS: + return + event, hint = event_from_exception( exc_info, client_options=sentry_sdk.get_client().options,