diff --git a/sentry_sdk/integrations/rq.py b/sentry_sdk/integrations/rq.py index ce8746e80d..ba4884d5c8 100644 --- a/sentry_sdk/integrations/rq.py +++ b/sentry_sdk/integrations/rq.py @@ -2,15 +2,16 @@ import sentry_sdk from sentry_sdk.api import continue_trace -from sentry_sdk.consts import OP +from sentry_sdk.consts import OP, SPANDATA from sentry_sdk.integrations import DidNotEnable, Integration, _check_minimum_version from sentry_sdk.integrations.logging import ignore_logger -from sentry_sdk.scope import should_send_default_pii +from sentry_sdk.scope import Scope, should_send_default_pii +from sentry_sdk.traces import SegmentSource from sentry_sdk.tracing import TransactionSource +from sentry_sdk.tracing_utils import has_span_streaming_enabled from sentry_sdk.utils import ( SENSITIVE_DATA_SUBSTITUTE, capture_internal_exceptions, - ensure_integration_enabled, event_from_exception, format_timestamp, parse_version, @@ -61,30 +62,59 @@ def setup_once() -> None: old_perform_job = worker_cls.perform_job - @ensure_integration_enabled(RqIntegration, old_perform_job) def sentry_patched_perform_job( self: "Any", job: "Job", *args: "Queue", **kwargs: "Any" ) -> bool: + client = sentry_sdk.get_client() + if client.get_integration(RqIntegration) is None: + return old_perform_job(self, job, *args, **kwargs) + with sentry_sdk.new_scope() as scope: scope.clear_breadcrumbs() scope.add_event_processor(_make_event_processor(weakref.ref(job))) - transaction = continue_trace( - job.meta.get("_sentry_trace_headers") or {}, - op=OP.QUEUE_TASK_RQ, - name="unknown RQ task", - source=TransactionSource.TASK, - origin=RqIntegration.origin, - ) - - with capture_internal_exceptions(): - transaction.name = job.func_name - - with sentry_sdk.start_transaction( - transaction, - custom_sampling_context={"rq_job": job}, - ): - rv = old_perform_job(self, job, *args, **kwargs) + if has_span_streaming_enabled(client.options): + sentry_sdk.traces.continue_trace( + job.meta.get("_sentry_trace_headers") or {} + ) + + Scope.set_custom_sampling_context({"rq_job": job}) + + func_name = None + with capture_internal_exceptions(): + func_name = job.func_name + + with sentry_sdk.traces.start_span( + name="unknown RQ task" if func_name is None else func_name, + attributes={ + "sentry.op": OP.QUEUE_TASK_RQ, + "sentry.origin": RqIntegration.origin, + "sentry.span.source": SegmentSource.TASK, + SPANDATA.MESSAGING_MESSAGE_ID: job.id, + }, + parent_span=None, + ) as span: + if func_name is not None: + span.set_attribute(SPANDATA.CODE_FUNCTION_NAME, func_name) + + rv = old_perform_job(self, job, *args, **kwargs) + else: + transaction = continue_trace( + job.meta.get("_sentry_trace_headers") or {}, + op=OP.QUEUE_TASK_RQ, + name="unknown RQ task", + source=TransactionSource.TASK, + origin=RqIntegration.origin, + ) + + with capture_internal_exceptions(): + transaction.name = job.func_name + + with sentry_sdk.start_transaction( + transaction, + custom_sampling_context={"rq_job": job}, + ): + rv = old_perform_job(self, job, *args, **kwargs) if self.is_horse: # We're inside of a forked process and RQ is @@ -116,12 +146,20 @@ def sentry_patched_handle_exception( old_enqueue_job = Queue.enqueue_job - @ensure_integration_enabled(RqIntegration, old_enqueue_job) def sentry_patched_enqueue_job( self: "Queue", job: "Any", **kwargs: "Any" ) -> "Any": + client = sentry_sdk.get_client() + if client.get_integration(RqIntegration) is None: + return old_enqueue_job(self, job, **kwargs) + scope = sentry_sdk.get_current_scope() - if scope.span is not None: + span = ( + scope.streamed_span + if has_span_streaming_enabled(client.options) + else scope.span + ) + if span is not None: job.meta["_sentry_trace_headers"] = dict( scope.iter_trace_propagation_headers() ) diff --git a/tests/conftest.py b/tests/conftest.py index 77618b1e6a..7d0068b3cb 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -466,8 +466,8 @@ def append(envelope): def flush(timeout=None, callback=None): real_flush(timeout=timeout, callback=callback) - items_w.write(json.dumps(telemetry).encode("utf-8")) - items_w.write(b"\n") + items_w.write(json.dumps(telemetry).encode("utf-8") + b"\n") + items_w.write(b"flush\n") monkeypatch.setattr(test_client.transport, "capture_envelope", append) monkeypatch.setattr(test_client, "flush", flush) diff --git a/tests/integrations/rq/test_rq.py b/tests/integrations/rq/test_rq.py index 42a032f8a9..9b82a31110 100644 --- a/tests/integrations/rq/test_rq.py +++ b/tests/integrations/rq/test_rq.py @@ -6,6 +6,7 @@ import sentry_sdk from sentry_sdk import start_transaction +from sentry_sdk.consts import SPANDATA from sentry_sdk.integrations.rq import RqIntegration from sentry_sdk.utils import SENSITIVE_DATA_SUBSTITUTE, parse_version @@ -47,24 +48,48 @@ def do_trick(dog, trick): @pytest.mark.parametrize("send_default_pii", [True, False]) -def test_basic(sentry_init, capture_events, send_default_pii): - sentry_init(integrations=[RqIntegration()], send_default_pii=send_default_pii) - events = capture_events() +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_basic( + sentry_init, + capture_events, + capture_items, + send_default_pii, + span_streaming, +): + sentry_init( + integrations=[RqIntegration()], + send_default_pii=send_default_pii, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) queue = rq.Queue(connection=FakeStrictRedis()) worker = rq.SimpleWorker([queue], connection=queue.connection) - queue.enqueue(crashing_job, foo=42) - worker.work(burst=True) + if span_streaming: + items = capture_items("event") - (event,) = events + queue.enqueue(crashing_job, foo=42) + worker.work(burst=True) + + (event,) = (item.payload for item in items) - (exception,) = event["exception"]["values"] - assert exception["type"] == "ZeroDivisionError" - assert exception["mechanism"]["type"] == "rq" - assert exception["stacktrace"]["frames"][-1]["vars"]["foo"] == "42" + (exception,) = event["exception"]["values"] + assert exception["type"] == "ZeroDivisionError" + assert exception["mechanism"]["type"] == "rq" + assert exception["stacktrace"]["frames"][-1]["vars"]["foo"] == "42" + else: + events = capture_events() + + queue.enqueue(crashing_job, foo=42) + worker.work(burst=True) - assert event["transaction"] == "tests.integrations.rq.test_rq.crashing_job" + (event,) = events + + (exception,) = event["exception"]["values"] + assert exception["type"] == "ZeroDivisionError" + assert exception["mechanism"]["type"] == "rq" + assert exception["stacktrace"]["frames"][-1]["vars"]["foo"] == "42" + assert event["transaction"] == "tests.integrations.rq.test_rq.crashing_job" extra = event["extra"]["rq-job"] if send_default_pii: @@ -83,180 +108,352 @@ def test_basic(sentry_init, capture_events, send_default_pii): assert "started_at" in extra -def test_transport_shutdown(sentry_init, capture_events_forksafe): - sentry_init(integrations=[RqIntegration()]) - - events = capture_events_forksafe() +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_transport_shutdown( + sentry_init, capture_events_forksafe, capture_items_forksafe, span_streaming +): + sentry_init( + integrations=[RqIntegration()], + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) queue = rq.Queue(connection=FakeStrictRedis()) worker = rq.Worker([queue], connection=queue.connection) - queue.enqueue(crashing_job, foo=42) - worker.work(burst=True) + if span_streaming: + items = capture_items_forksafe("event") + + queue.enqueue(crashing_job, foo=42) + worker.work(burst=True) + + captured_items = items.read_event() + items.read_flush() + + event = next(item["payload"] for item in captured_items) + (exception,) = event["exception"]["values"] + assert exception["type"] == "ZeroDivisionError" + else: + events = capture_events_forksafe() + + queue.enqueue(crashing_job, foo=42) + worker.work(burst=True) - event = events.read_event() - events.read_flush() + event = events.read_event() + events.read_flush() - (exception,) = event["exception"]["values"] - assert exception["type"] == "ZeroDivisionError" + (exception,) = event["exception"]["values"] + assert exception["type"] == "ZeroDivisionError" @pytest.mark.parametrize("send_default_pii", [True, False]) +@pytest.mark.parametrize("span_streaming", [True, False]) def test_transaction_with_error( sentry_init, capture_events, - DictionaryContaining, # noqa:N803 + capture_items, + DictionaryContaining, send_default_pii, + span_streaming, ): sentry_init( integrations=[RqIntegration()], traces_sample_rate=1.0, send_default_pii=send_default_pii, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) - events = capture_events() queue = rq.Queue(connection=FakeStrictRedis()) worker = rq.SimpleWorker([queue], connection=queue.connection) - queue.enqueue(chew_up_shoes, "Charlie", "Katie", shoes="flip-flops") - worker.work(burst=True) + if span_streaming: + items = capture_items("event", "span") - error_event, envelope = events + queue.enqueue(chew_up_shoes, "Charlie", "Katie", shoes="flip-flops") + worker.work(burst=True) - assert error_event["transaction"] == "tests.integrations.rq.test_rq.chew_up_shoes" - assert error_event["contexts"]["trace"]["op"] == "queue.task.rq" - assert error_event["exception"]["values"][0]["type"] == "Exception" - assert ( - error_event["exception"]["values"][0]["value"] - == "Charlie!! Why did you eat Katie's flip-flops??" - ) + (error_event,) = (item.payload for item in items if item.type == "event") - assert envelope["type"] == "transaction" - assert envelope["contexts"]["trace"] == error_event["contexts"]["trace"] - assert envelope["transaction"] == error_event["transaction"] - assert envelope["extra"]["rq-job"] == DictionaryContaining( - { - "args": ( - ["Charlie", "Katie"] if send_default_pii else SENSITIVE_DATA_SUBSTITUTE - ), - "kwargs": ( - {"shoes": "flip-flops"} - if send_default_pii - else SENSITIVE_DATA_SUBSTITUTE - ), - "func": "tests.integrations.rq.test_rq.chew_up_shoes", - "description": "tests.integrations.rq.test_rq.chew_up_shoes('Charlie', 'Katie', shoes='flip-flops')", - } - ) + assert ( + error_event["transaction"] == "tests.integrations.rq.test_rq.chew_up_shoes" + ) + assert error_event["contexts"]["trace"]["op"] == "queue.task.rq" + assert error_event["exception"]["values"][0]["type"] == "Exception" + assert ( + error_event["exception"]["values"][0]["value"] + == "Charlie!! Why did you eat Katie's flip-flops??" + ) + + sentry_sdk.flush() + spans = [item.payload for item in items if item.type == "span"] + (span,) = ( + span + for span in spans + if span["attributes"].get("sentry.op") == "queue.task.rq" + ) + + assert span["trace_id"] == error_event["contexts"]["trace"]["trace_id"] + assert span["span_id"] == error_event["contexts"]["trace"]["span_id"] + assert span["attributes"]["sentry.op"] == error_event["contexts"]["trace"]["op"] + assert ( + span["attributes"]["sentry.origin"] + == error_event["contexts"]["trace"]["origin"] + ) + assert ( + span["attributes"][SPANDATA.CODE_FUNCTION_NAME] + == "tests.integrations.rq.test_rq.chew_up_shoes" + ) + else: + events = capture_events() + + queue.enqueue(chew_up_shoes, "Charlie", "Katie", shoes="flip-flops") + worker.work(burst=True) + + error_event, envelope = events + + assert ( + error_event["transaction"] == "tests.integrations.rq.test_rq.chew_up_shoes" + ) + assert error_event["contexts"]["trace"]["op"] == "queue.task.rq" + assert error_event["exception"]["values"][0]["type"] == "Exception" + assert ( + error_event["exception"]["values"][0]["value"] + == "Charlie!! Why did you eat Katie's flip-flops??" + ) + assert envelope["type"] == "transaction" + assert envelope["contexts"]["trace"] == error_event["contexts"]["trace"] + assert envelope["transaction"] == error_event["transaction"] + assert envelope["extra"]["rq-job"] == DictionaryContaining( + { + "args": ( + ["Charlie", "Katie"] + if send_default_pii + else SENSITIVE_DATA_SUBSTITUTE + ), + "kwargs": ( + {"shoes": "flip-flops"} + if send_default_pii + else SENSITIVE_DATA_SUBSTITUTE + ), + "func": "tests.integrations.rq.test_rq.chew_up_shoes", + "description": "tests.integrations.rq.test_rq.chew_up_shoes('Charlie', 'Katie', shoes='flip-flops')", + } + ) + +@pytest.mark.parametrize("span_streaming", [True, False]) def test_error_has_trace_context_if_tracing_disabled( sentry_init, capture_events, + capture_items, + span_streaming, ): - sentry_init(integrations=[RqIntegration()]) - events = capture_events() + sentry_init( + integrations=[RqIntegration()], + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) queue = rq.Queue(connection=FakeStrictRedis()) worker = rq.SimpleWorker([queue], connection=queue.connection) - queue.enqueue(crashing_job, foo=None) - worker.work(burst=True) + if span_streaming: + items = capture_items("event") + + queue.enqueue(crashing_job, foo=None) + worker.work(burst=True) + + (error_event,) = (item.payload for item in items) + else: + events = capture_events() + + queue.enqueue(crashing_job, foo=None) + worker.work(burst=True) - (error_event,) = events + (error_event,) = events assert error_event["contexts"]["trace"] +@pytest.mark.parametrize("span_streaming", [True, False]) def test_tracing_enabled( sentry_init, capture_events, + capture_items, + span_streaming, ): - sentry_init(integrations=[RqIntegration()], traces_sample_rate=1.0) - events = capture_events() + sentry_init( + integrations=[RqIntegration()], + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) queue = rq.Queue(connection=FakeStrictRedis()) worker = rq.SimpleWorker([queue], connection=queue.connection) - with start_transaction(op="rq transaction") as transaction: - queue.enqueue(crashing_job, foo=None) - worker.work(burst=True) + if span_streaming: + items = capture_items("event", "span") + + with sentry_sdk.traces.start_span( + name="custom parent", + attributes={ + "sentry.op": "rq transaction", + }, + ) as span: + queue.enqueue(crashing_job, foo=None) + worker.work(burst=True) - error_event, envelope, _ = events + (error_event,) = (item.payload for item in items) - assert error_event["transaction"] == "tests.integrations.rq.test_rq.crashing_job" - assert error_event["contexts"]["trace"]["trace_id"] == transaction.trace_id + assert error_event["contexts"]["trace"]["trace_id"] == span.trace_id - assert envelope["contexts"]["trace"] == error_event["contexts"]["trace"] + sentry_sdk.flush() + spans = [item.payload for item in items if item.type == "span"] + (span,) = ( + span + for span in spans + if span["attributes"].get("sentry.op") == "queue.task.rq" + ) + + assert span["trace_id"] == error_event["contexts"]["trace"]["trace_id"] + assert span["span_id"] == error_event["contexts"]["trace"]["span_id"] + assert span["attributes"]["sentry.op"] == error_event["contexts"]["trace"]["op"] + assert ( + span["attributes"]["sentry.origin"] + == error_event["contexts"]["trace"]["origin"] + ) + else: + events = capture_events() + + with start_transaction(op="rq transaction") as transaction: + queue.enqueue(crashing_job, foo=None) + worker.work(burst=True) + + error_event, envelope, _ = events + + assert ( + error_event["transaction"] == "tests.integrations.rq.test_rq.crashing_job" + ) + assert error_event["contexts"]["trace"]["trace_id"] == transaction.trace_id + + assert envelope["contexts"]["trace"] == error_event["contexts"]["trace"] +@pytest.mark.parametrize("span_streaming", [True, False]) def test_tracing_disabled( sentry_init, capture_events, + capture_items, + span_streaming, ): - sentry_init(integrations=[RqIntegration()]) - events = capture_events() + sentry_init( + integrations=[RqIntegration()], + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) queue = rq.Queue(connection=FakeStrictRedis()) worker = rq.SimpleWorker([queue], connection=queue.connection) - scope = sentry_sdk.get_isolation_scope() - queue.enqueue(crashing_job, foo=None) - worker.work(burst=True) + if span_streaming: + items = capture_items("event") - (error_event,) = events + queue.enqueue(crashing_job, foo=None) + worker.work(burst=True) - assert error_event["transaction"] == "tests.integrations.rq.test_rq.crashing_job" - assert ( - error_event["contexts"]["trace"]["trace_id"] - == scope._propagation_context.trace_id - ) + (error_event,) = (item.payload for item in items) + + assert error_event["contexts"]["trace"]["trace_id"] + else: + events = capture_events() + + scope = sentry_sdk.get_isolation_scope() + queue.enqueue(crashing_job, foo=None) + worker.work(burst=True) + + (error_event,) = events + + assert ( + error_event["transaction"] == "tests.integrations.rq.test_rq.crashing_job" + ) + assert ( + error_event["contexts"]["trace"]["trace_id"] + == scope._propagation_context.trace_id + ) @pytest.mark.parametrize("send_default_pii", [True, False]) +@pytest.mark.parametrize("span_streaming", [True, False]) def test_transaction_no_error( sentry_init, capture_events, - DictionaryContaining, # noqa:N803 + capture_items, + DictionaryContaining, send_default_pii, + span_streaming, ): sentry_init( integrations=[RqIntegration()], traces_sample_rate=1.0, send_default_pii=send_default_pii, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) - events = capture_events() queue = rq.Queue(connection=FakeStrictRedis()) worker = rq.SimpleWorker([queue], connection=queue.connection) - queue.enqueue(do_trick, "Maisey", trick="kangaroo") - worker.work(burst=True) + if span_streaming: + items = capture_items("span") - envelope = events[0] - - assert envelope["type"] == "transaction" - assert envelope["contexts"]["trace"]["op"] == "queue.task.rq" - assert envelope["transaction"] == "tests.integrations.rq.test_rq.do_trick" - assert envelope["extra"]["rq-job"] == DictionaryContaining( - { - "args": ["Maisey"] if send_default_pii else SENSITIVE_DATA_SUBSTITUTE, - "kwargs": ( - {"trick": "kangaroo"} if send_default_pii else SENSITIVE_DATA_SUBSTITUTE - ), - "func": "tests.integrations.rq.test_rq.do_trick", - "description": "tests.integrations.rq.test_rq.do_trick('Maisey', trick='kangaroo')", - } - ) + queue.enqueue(do_trick, "Maisey", trick="kangaroo") + worker.work(burst=True) + + sentry_sdk.flush() + spans = [item.payload for item in items] + (span,) = ( + span + for span in spans + if span["attributes"].get("sentry.op") == "queue.task.rq" + ) + + assert span["attributes"]["sentry.op"] == "queue.task.rq" + assert span["name"] == "tests.integrations.rq.test_rq.do_trick" + else: + events = capture_events() + + queue.enqueue(do_trick, "Maisey", trick="kangaroo") + worker.work(burst=True) + + envelope = events[0] + + assert envelope["type"] == "transaction" + assert envelope["contexts"]["trace"]["op"] == "queue.task.rq" + assert envelope["transaction"] == "tests.integrations.rq.test_rq.do_trick" + assert envelope["extra"]["rq-job"] == DictionaryContaining( + { + "args": ["Maisey"] if send_default_pii else SENSITIVE_DATA_SUBSTITUTE, + "kwargs": ( + {"trick": "kangaroo"} + if send_default_pii + else SENSITIVE_DATA_SUBSTITUTE + ), + "func": "tests.integrations.rq.test_rq.do_trick", + "description": "tests.integrations.rq.test_rq.do_trick('Maisey', trick='kangaroo')", + } + ) +@pytest.mark.parametrize("span_streaming", [True, False]) def test_traces_sampler_gets_correct_values_in_sampling_context( sentry_init, DictionaryContaining, - ObjectDescribedBy, # noqa:N803 + ObjectDescribedBy, + span_streaming, # noqa:N803 ): traces_sampler = mock.Mock(return_value=True) - sentry_init(integrations=[RqIntegration()], traces_sampler=traces_sampler) + sentry_init( + integrations=[RqIntegration()], + traces_sampler=traces_sampler, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) queue = rq.Queue(connection=FakeStrictRedis()) worker = rq.SimpleWorker([queue], connection=queue.connection) @@ -285,29 +482,73 @@ def test_traces_sampler_gets_correct_values_in_sampling_context( @pytest.mark.skipif( parse_version(rq.__version__) < (1, 5), reason="At least rq-1.5 required" ) -def test_job_with_retries(sentry_init, capture_events): - sentry_init(integrations=[RqIntegration()]) - events = capture_events() +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_job_with_retries( + sentry_init, + capture_events, + capture_items, + span_streaming, +): + sentry_init( + integrations=[RqIntegration()], + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) queue = rq.Queue(connection=FakeStrictRedis()) worker = rq.SimpleWorker([queue], connection=queue.connection) - queue.enqueue(crashing_job, foo=42, retry=rq.Retry(max=1)) - worker.work(burst=True) + if span_streaming: + items = capture_items("event") + + queue.enqueue(crashing_job, foo=42, retry=rq.Retry(max=1)) + worker.work(burst=True) + events = [item.payload for item in items] + else: + events = capture_events() + + queue.enqueue(crashing_job, foo=42, retry=rq.Retry(max=1)) + worker.work(burst=True) assert len(events) == 1 -def test_span_origin(sentry_init, capture_events): - sentry_init(integrations=[RqIntegration()], traces_sample_rate=1.0) - events = capture_events() +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_span_origin( + sentry_init, + capture_events, + capture_items, + span_streaming, +): + sentry_init( + integrations=[RqIntegration()], + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) queue = rq.Queue(connection=FakeStrictRedis()) worker = rq.SimpleWorker([queue], connection=queue.connection) - queue.enqueue(do_trick, "Maisey", trick="kangaroo") - worker.work(burst=True) + if span_streaming: + items = capture_items("span") + + queue.enqueue(do_trick, "Maisey", trick="kangaroo") + worker.work(burst=True) + + sentry_sdk.flush() + spans = [item.payload for item in items if item.type == "span"] + (span,) = ( + span + for span in spans + if span["attributes"].get("sentry.op") == "queue.task.rq" + ) + + assert span["attributes"]["sentry.origin"] == "auto.queue.rq" + else: + events = capture_events() + + queue.enqueue(do_trick, "Maisey", trick="kangaroo") + worker.work(burst=True) - (event,) = events + (event,) = events - assert event["contexts"]["trace"]["origin"] == "auto.queue.rq" + assert event["contexts"]["trace"]["origin"] == "auto.queue.rq"