diff --git a/sentry_sdk/integrations/arq.py b/sentry_sdk/integrations/arq.py index bbdb74bc8a..da03bafb8b 100644 --- a/sentry_sdk/integrations/arq.py +++ b/sentry_sdk/integrations/arq.py @@ -1,13 +1,16 @@ import sys import sentry_sdk -from sentry_sdk.consts import OP, SPANSTATUS +from sentry_sdk.consts import OP, SPANDATA, SPANSTATUS from sentry_sdk.integrations import DidNotEnable, Integration, _check_minimum_version from sentry_sdk.integrations.logging import ignore_logger from sentry_sdk.scope import should_send_default_pii +from sentry_sdk.traces import SegmentSource from sentry_sdk.tracing import Transaction, TransactionSource +from sentry_sdk.tracing_utils import has_span_streaming_enabled from sentry_sdk.utils import ( SENSITIVE_DATA_SUBSTITUTE, + _register_control_flow_exception, capture_internal_exceptions, ensure_integration_enabled, event_from_exception, @@ -59,6 +62,8 @@ def setup_once() -> None: patch_run_job() patch_create_worker() + _register_control_flow_exception(ARQ_CONTROL_FLOW_EXCEPTIONS) # type: ignore + ignore_logger("arq.worker") @@ -69,10 +74,20 @@ def patch_enqueue_job() -> None: async def _sentry_enqueue_job( self: "ArqRedis", function: str, *args: "Any", **kwargs: "Any" ) -> "Optional[Job]": - integration = sentry_sdk.get_client().get_integration(ArqIntegration) - if integration is None: + client = sentry_sdk.get_client() + if client.get_integration(ArqIntegration) is None: return await old_enqueue_job(self, function, *args, **kwargs) + if has_span_streaming_enabled(client.options): + with sentry_sdk.traces.start_span( + name=function, + attributes={ + "sentry.op": OP.QUEUE_SUBMIT_ARQ, + "sentry.origin": ArqIntegration.origin, + }, + ): + return await old_enqueue_job(self, function, *args, **kwargs) + with sentry_sdk.start_span( op=OP.QUEUE_SUBMIT_ARQ, name=function, origin=ArqIntegration.origin ): @@ -86,14 +101,27 @@ def patch_run_job() -> None: old_run_job = Worker.run_job async def _sentry_run_job(self: "Worker", job_id: str, score: int) -> None: - integration = sentry_sdk.get_client().get_integration(ArqIntegration) - if integration is None: + client = sentry_sdk.get_client() + if client.get_integration(ArqIntegration) is None: return await old_run_job(self, job_id, score) with sentry_sdk.isolation_scope() as scope: scope._name = "arq" scope.clear_breadcrumbs() + if has_span_streaming_enabled(client.options): + with sentry_sdk.traces.start_span( + name="unknown arq task", + attributes={ + "sentry.op": OP.QUEUE_TASK_ARQ, + "sentry.origin": ArqIntegration.origin, + "sentry.span.source": SegmentSource.TASK, + SPANDATA.MESSAGING_MESSAGE_ID: job_id, + }, + parent_span=None, + ): + return await old_run_job(self, job_id, score) + transaction = Transaction( name="unknown arq task", status="ok", @@ -163,10 +191,19 @@ def _wrap_coroutine(name: str, coroutine: "WorkerCoroutine") -> "WorkerCoroutine async def _sentry_coroutine( ctx: "Dict[Any, Any]", *args: "Any", **kwargs: "Any" ) -> "Any": - integration = sentry_sdk.get_client().get_integration(ArqIntegration) + client = sentry_sdk.get_client() + integration = client.get_integration(ArqIntegration) if integration is None: return await coroutine(ctx, *args, **kwargs) + if has_span_streaming_enabled(client.options): + scope = sentry_sdk.get_current_scope() + span = scope.streamed_span + if span is not None: + span.name = name + + scope.set_transaction_name(name) + sentry_sdk.get_isolation_scope().add_event_processor( _make_event_processor({**ctx, "job_name": name}, *args, **kwargs) ) diff --git a/tests/integrations/arq/test_arq.py b/tests/integrations/arq/test_arq.py index 49dd48f85e..d18e90d4eb 100644 --- a/tests/integrations/arq/test_arq.py +++ b/tests/integrations/arq/test_arq.py @@ -9,6 +9,7 @@ from arq.utils import timestamp_ms from fakeredis.aioredis import FakeRedis +import sentry_sdk from sentry_sdk import get_client, start_transaction from sentry_sdk.integrations.arq import ArqIntegration @@ -41,6 +42,7 @@ def info(self, section): @pytest.fixture def init_arq(sentry_init): def inner( + span_streaming, cls_functions=None, cls_cron_jobs=None, kw_functions=None, @@ -60,6 +62,7 @@ def inner( integrations=[ArqIntegration()], traces_sample_rate=1.0, send_default_pii=True, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) server = FakeRedis() @@ -86,6 +89,7 @@ class WorkerSettings: @pytest.fixture def init_arq_with_dict_settings(sentry_init): def inner( + span_streaming, cls_functions=None, cls_cron_jobs=None, kw_functions=None, @@ -105,6 +109,7 @@ def inner( integrations=[ArqIntegration()], traces_sample_rate=1.0, send_default_pii=True, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) server = FakeRedis() @@ -134,6 +139,7 @@ def init_arq_with_kwarg_settings(sentry_init): """Test fixture that passes settings_cls as keyword argument only.""" def inner( + span_streaming, cls_functions=None, cls_cron_jobs=None, kw_functions=None, @@ -153,6 +159,7 @@ def inner( integrations=[ArqIntegration()], traces_sample_rate=1.0, send_default_pii=True, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) server = FakeRedis() @@ -182,7 +189,8 @@ class WorkerSettings: "init_arq_settings", ["init_arq", "init_arq_with_dict_settings", "init_arq_with_kwarg_settings"], ) -async def test_job_result(init_arq_settings, request): +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_job_result(init_arq_settings, request, span_streaming): async def increase(ctx, num): return num + 1 @@ -190,7 +198,7 @@ async def increase(ctx, num): increase.__qualname__ = increase.__name__ - pool, worker = init_fixture_method([increase]) + pool, worker = init_fixture_method(span_streaming, [increase]) job = await pool.enqueue_job("increase", 3) @@ -208,7 +216,14 @@ async def increase(ctx, num): @pytest.mark.parametrize( "init_arq_settings", ["init_arq", "init_arq_with_dict_settings"] ) -async def test_job_retry(capture_events, init_arq_settings, request): +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_job_retry( + capture_events, + capture_items, + init_arq_settings, + request, + span_streaming, +): async def retry_job(ctx): if ctx["job_try"] < 2: raise arq.worker.Retry @@ -217,27 +232,48 @@ async def retry_job(ctx): retry_job.__qualname__ = retry_job.__name__ - pool, worker = init_fixture_method([retry_job]) + pool, worker = init_fixture_method(span_streaming, [retry_job]) job = await pool.enqueue_job("retry_job") - events = capture_events() + if span_streaming: + items = capture_items("span") - await worker.run_job(job.job_id, timestamp_ms()) + await worker.run_job(job.job_id, timestamp_ms()) - event = events.pop(0) - assert event["contexts"]["trace"]["status"] == "aborted" - assert event["transaction"] == "retry_job" - assert event["tags"]["arq_task_id"] == job.job_id - assert event["extra"]["arq-job"]["retry"] == 1 + sentry_sdk.flush() + spans = [item.payload for item in items] - await worker.run_job(job.job_id, timestamp_ms()) + assert spans[4]["attributes"]["sentry.op"] == "queue.task.arq" + assert spans[4]["status"] == "ok" + assert spans[4]["name"] == "retry_job" + + await worker.run_job(job.job_id, timestamp_ms()) + + sentry_sdk.flush() + spans = [item.payload for item in items] + + assert spans[7]["attributes"]["sentry.op"] == "queue.task.arq" + assert spans[7]["status"] == "ok" + assert spans[7]["name"] == "retry_job" + else: + events = capture_events() + + await worker.run_job(job.job_id, timestamp_ms()) - event = events.pop(0) - assert event["contexts"]["trace"]["status"] == "ok" - assert event["transaction"] == "retry_job" - assert event["tags"]["arq_task_id"] == job.job_id - assert event["extra"]["arq-job"]["retry"] == 2 + event = events.pop(0) + assert event["contexts"]["trace"]["status"] == "aborted" + assert event["transaction"] == "retry_job" + assert event["tags"]["arq_task_id"] == job.job_id + assert event["extra"]["arq-job"]["retry"] == 1 + + await worker.run_job(job.job_id, timestamp_ms()) + + event = events.pop(0) + assert event["contexts"]["trace"]["status"] == "ok" + assert event["transaction"] == "retry_job" + assert event["tags"]["arq_task_id"] == job.job_id + assert event["extra"]["arq-job"]["retry"] == 2 @pytest.mark.parametrize( @@ -248,8 +284,15 @@ async def retry_job(ctx): "init_arq_settings", ["init_arq", "init_arq_with_dict_settings"] ) @pytest.mark.asyncio +@pytest.mark.parametrize("span_streaming", [True, False]) async def test_job_transaction( - capture_events, init_arq_settings, source, job_fails, request + capture_events, + capture_items, + init_arq_settings, + source, + job_fails, + request, + span_streaming, ): async def division(_, a, b=0): return a / b @@ -265,65 +308,126 @@ async def division(_, a, b=0): functions_key, cron_jobs_key = source pool, worker = init_fixture_method( - **{functions_key: [division], cron_jobs_key: [cron_job]} + span_streaming, **{functions_key: [division], cron_jobs_key: [cron_job]} ) - events = capture_events() - job = await pool.enqueue_job("division", 1, b=int(not job_fails)) - await worker.run_job(job.job_id, timestamp_ms()) - loop = asyncio.get_event_loop() - task = loop.create_task(worker.async_run()) - await asyncio.sleep(1) + if span_streaming: + items = capture_items("event", "span") - task.cancel() + await worker.run_job(job.job_id, timestamp_ms()) - await worker.close() + loop = asyncio.get_event_loop() + task = loop.create_task(worker.async_run()) + await asyncio.sleep(1) - if job_fails: - error_func_event = events.pop(0) - error_cron_event = events.pop(1) + task.cancel() - assert error_func_event["exception"]["values"][0]["type"] == "ZeroDivisionError" - assert error_func_event["exception"]["values"][0]["mechanism"]["type"] == "arq" + await worker.close() - func_extra = error_func_event["extra"]["arq-job"] - assert func_extra["task"] == "division" + events = [item.payload for item in items if item.type == "event"] + if job_fails: + error_func_event = events.pop(0) + error_cron_event = events.pop(0) - assert error_cron_event["exception"]["values"][0]["type"] == "ZeroDivisionError" - assert error_cron_event["exception"]["values"][0]["mechanism"]["type"] == "arq" + assert ( + error_func_event["exception"]["values"][0]["type"] + == "ZeroDivisionError" + ) + assert ( + error_func_event["exception"]["values"][0]["mechanism"]["type"] == "arq" + ) - cron_extra = error_cron_event["extra"]["arq-job"] - assert cron_extra["task"] == "cron:division" + func_extra = error_func_event["extra"]["arq-job"] + assert func_extra["task"] == "division" + + assert ( + error_cron_event["exception"]["values"][0]["type"] + == "ZeroDivisionError" + ) + assert ( + error_cron_event["exception"]["values"][0]["mechanism"]["type"] == "arq" + ) - [func_event, cron_event] = events + cron_extra = error_cron_event["extra"]["arq-job"] + assert cron_extra["task"] == "cron:division" - assert func_event["type"] == "transaction" - assert func_event["transaction"] == "division" - assert func_event["transaction_info"] == {"source": "task"} + sentry_sdk.flush() + spans = [item.payload for item in items if item.type == "span"] - assert "arq_task_id" in func_event["tags"] - assert "arq_task_retry" in func_event["tags"] + assert spans[4]["attributes"]["sentry.op"] == "queue.task.arq" + assert spans[4]["name"] == "division" + assert spans[4]["attributes"]["sentry.span.source"] == "task" - func_extra = func_event["extra"]["arq-job"] + assert spans[15]["attributes"]["sentry.op"] == "queue.task.arq" + assert spans[15]["name"] == "cron:division" + else: + events = capture_events() - assert func_extra["task"] == "division" - assert func_extra["kwargs"] == {"b": int(not job_fails)} - assert func_extra["retry"] == 1 + await worker.run_job(job.job_id, timestamp_ms()) - assert cron_event["type"] == "transaction" - assert cron_event["transaction"] == "cron:division" - assert cron_event["transaction_info"] == {"source": "task"} + loop = asyncio.get_event_loop() + task = loop.create_task(worker.async_run()) + await asyncio.sleep(1) - assert "arq_task_id" in cron_event["tags"] - assert "arq_task_retry" in cron_event["tags"] + task.cancel() - cron_extra = cron_event["extra"]["arq-job"] + await worker.close() - assert cron_extra["task"] == "cron:division" - assert cron_extra["kwargs"] == {} - assert cron_extra["retry"] == 1 + if job_fails: + error_func_event = events.pop(0) + error_cron_event = events.pop(1) + + assert ( + error_func_event["exception"]["values"][0]["type"] + == "ZeroDivisionError" + ) + assert ( + error_func_event["exception"]["values"][0]["mechanism"]["type"] == "arq" + ) + + func_extra = error_func_event["extra"]["arq-job"] + assert func_extra["task"] == "division" + + assert ( + error_cron_event["exception"]["values"][0]["type"] + == "ZeroDivisionError" + ) + assert ( + error_cron_event["exception"]["values"][0]["mechanism"]["type"] == "arq" + ) + + cron_extra = error_cron_event["extra"]["arq-job"] + assert cron_extra["task"] == "cron:division" + + [func_event, cron_event] = events + + assert func_event["type"] == "transaction" + assert func_event["transaction"] == "division" + assert func_event["transaction_info"] == {"source": "task"} + + assert "arq_task_id" in func_event["tags"] + assert "arq_task_retry" in func_event["tags"] + + func_extra = func_event["extra"]["arq-job"] + + assert func_extra["task"] == "division" + assert func_extra["kwargs"] == {"b": int(not job_fails)} + assert func_extra["retry"] == 1 + + assert cron_event["type"] == "transaction" + assert cron_event["transaction"] == "cron:division" + assert cron_event["transaction_info"] == {"source": "task"} + + assert "arq_task_id" in cron_event["tags"] + assert "arq_task_retry" in cron_event["tags"] + + cron_extra = cron_event["extra"]["arq-job"] + + assert cron_extra["task"] == "cron:division" + assert cron_extra["kwargs"] == {} + assert cron_extra["retry"] == 1 @pytest.mark.parametrize("source", ["cls_functions", "kw_functions"]) @@ -331,34 +435,61 @@ async def division(_, a, b=0): "init_arq_settings", ["init_arq", "init_arq_with_dict_settings"] ) @pytest.mark.asyncio -async def test_enqueue_job(capture_events, init_arq_settings, source, request): +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_enqueue_job( + capture_events, + capture_items, + init_arq_settings, + source, + request, + span_streaming, +): async def dummy_job(_): pass init_fixture_method = request.getfixturevalue(init_arq_settings) - pool, _ = init_fixture_method(**{source: [dummy_job]}) + pool, _ = init_fixture_method(span_streaming, **{source: [dummy_job]}) + + if span_streaming: + items = capture_items("span") + + with sentry_sdk.traces.start_span(name="custom parent") as span: + await pool.enqueue_job("dummy_job") + + sentry_sdk.flush() + spans = [item.payload for item in items] + + assert spans[2]["is_segment"] is True + assert spans[2]["trace_id"] == span.trace_id + assert spans[2]["span_id"] == span.span_id - events = capture_events() + assert spans[1]["attributes"]["sentry.op"] == "queue.submit.arq" + assert spans[1]["name"] == "dummy_job" + else: + events = capture_events() - with start_transaction() as transaction: - await pool.enqueue_job("dummy_job") + with start_transaction() as transaction: + await pool.enqueue_job("dummy_job") - (event,) = events + (event,) = events - assert event["contexts"]["trace"]["trace_id"] == transaction.trace_id - assert event["contexts"]["trace"]["span_id"] == transaction.span_id + assert event["contexts"]["trace"]["trace_id"] == transaction.trace_id + assert event["contexts"]["trace"]["span_id"] == transaction.span_id - assert len(event["spans"]) - assert event["spans"][0]["op"] == "queue.submit.arq" - assert event["spans"][0]["description"] == "dummy_job" + assert len(event["spans"]) + assert event["spans"][0]["op"] == "queue.submit.arq" + assert event["spans"][0]["description"] == "dummy_job" @pytest.mark.asyncio @pytest.mark.parametrize( "init_arq_settings", ["init_arq", "init_arq_with_dict_settings"] ) -async def test_execute_job_without_integration(init_arq_settings, request): +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_execute_job_without_integration( + init_arq_settings, request, span_streaming +): async def dummy_job(_ctx): pass @@ -366,7 +497,7 @@ async def dummy_job(_ctx): dummy_job.__qualname__ = dummy_job.__name__ - pool, worker = init_fixture_method([dummy_job]) + pool, worker = init_fixture_method(span_streaming, [dummy_job]) # remove the integration to trigger the edge case get_client().integrations.pop("arq") @@ -382,29 +513,55 @@ async def dummy_job(_ctx): "init_arq_settings", ["init_arq", "init_arq_with_dict_settings"] ) @pytest.mark.asyncio -async def test_span_origin_producer(capture_events, init_arq_settings, source, request): +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_span_origin_producer( + capture_events, + capture_items, + init_arq_settings, + source, + request, + span_streaming, +): async def dummy_job(_): pass init_fixture_method = request.getfixturevalue(init_arq_settings) - pool, _ = init_fixture_method(**{source: [dummy_job]}) + pool, _ = init_fixture_method(span_streaming, **{source: [dummy_job]}) + + if span_streaming: + items = capture_items("span") - events = capture_events() + with sentry_sdk.traces.start_span(name="custom parent"): + await pool.enqueue_job("dummy_job") - with start_transaction(): - await pool.enqueue_job("dummy_job") + sentry_sdk.flush() + spans = [item.payload for item in items] + assert spans[2]["attributes"]["sentry.origin"] == "manual" + assert spans[1]["attributes"]["sentry.origin"] == "auto.queue.arq" + else: + events = capture_events() - (event,) = events - assert event["contexts"]["trace"]["origin"] == "manual" - assert event["spans"][0]["origin"] == "auto.queue.arq" + with start_transaction(): + await pool.enqueue_job("dummy_job") + + (event,) = events + assert event["contexts"]["trace"]["origin"] == "manual" + assert event["spans"][0]["origin"] == "auto.queue.arq" @pytest.mark.asyncio @pytest.mark.parametrize( "init_arq_settings", ["init_arq", "init_arq_with_dict_settings"] ) -async def test_span_origin_consumer(capture_events, init_arq_settings, request): +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_span_origin_consumer( + capture_events, + capture_items, + init_arq_settings, + request, + span_streaming, +): async def job(ctx): pass @@ -412,23 +569,44 @@ async def job(ctx): job.__qualname__ = job.__name__ - pool, worker = init_fixture_method([job]) + pool, worker = init_fixture_method(span_streaming, [job]) - job = await pool.enqueue_job("retry_job") + if span_streaming: + job = await pool.enqueue_job("retry_job") - events = capture_events() + items = capture_items("span") - await worker.run_job(job.job_id, timestamp_ms()) + await worker.run_job(job.job_id, timestamp_ms()) + + sentry_sdk.flush() + spans = [item.payload for item in items] + + assert spans[4]["attributes"]["sentry.op"] == "queue.task.arq" + assert spans[4]["attributes"]["sentry.origin"] == "auto.queue.arq" + assert spans[3]["attributes"]["sentry.origin"] == "auto.db.redis" + assert spans[2]["attributes"]["sentry.origin"] == "auto.db.redis" + else: + job = await pool.enqueue_job("retry_job") + + events = capture_events() - (event,) = events + await worker.run_job(job.job_id, timestamp_ms()) - assert event["contexts"]["trace"]["origin"] == "auto.queue.arq" - assert event["spans"][0]["origin"] == "auto.db.redis" - assert event["spans"][1]["origin"] == "auto.db.redis" + (event,) = events + + assert event["contexts"]["trace"]["origin"] == "auto.queue.arq" + assert event["spans"][0]["origin"] == "auto.db.redis" + assert event["spans"][1]["origin"] == "auto.db.redis" @pytest.mark.asyncio -async def test_job_concurrency(capture_events, init_arq): +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_job_concurrency( + capture_events, + capture_items, + init_arq, + span_streaming, +): """ 10 - division starts 70 - sleepy starts @@ -447,9 +625,7 @@ async def division(_): sleepy.__qualname__ = sleepy.__name__ division.__qualname__ = division.__name__ - pool, worker = init_arq([sleepy, division]) - - events = capture_events() + pool, worker = init_arq(span_streaming, [sleepy, division]) await pool.enqueue_job( "division", _job_id="123", _defer_by=timedelta(milliseconds=10) @@ -458,15 +634,34 @@ async def division(_): "sleepy", _job_id="456", _defer_by=timedelta(milliseconds=70) ) - loop = asyncio.get_event_loop() - task = loop.create_task(worker.async_run()) - await asyncio.sleep(1) + if span_streaming: + items = capture_items("event") + + loop = asyncio.get_event_loop() + task = loop.create_task(worker.async_run()) + await asyncio.sleep(1) + + task.cancel() + + await worker.close() + + events = [item.payload for item in items] + exception_event = events[0] + assert exception_event["exception"]["values"][0]["type"] == "ZeroDivisionError" + assert exception_event["transaction"] == "division" + else: + events = capture_events() + + loop = asyncio.get_event_loop() + task = loop.create_task(worker.async_run()) + await asyncio.sleep(1) + + task.cancel() - task.cancel() + await worker.close() - await worker.close() + exception_event = events[1] + assert exception_event["exception"]["values"][0]["type"] == "ZeroDivisionError" + assert exception_event["transaction"] == "division" - exception_event = events[1] - assert exception_event["exception"]["values"][0]["type"] == "ZeroDivisionError" - assert exception_event["transaction"] == "division" assert exception_event["extra"]["arq-job"]["task"] == "division"