diff --git a/CHANGELOG.md b/CHANGELOG.md index 99f9b4c06c..4fc8eb10d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ ### Features +- Add Kafka producer instrumentation for Spring Boot 3 ([#5254](https://github.com/getsentry/sentry-java/pull/5254)) - Add `enableQueueTracing` option and messaging span data conventions ([#5250](https://github.com/getsentry/sentry-java/pull/5250)) - Prevent cross-organization trace continuation ([#5136](https://github.com/getsentry/sentry-java/pull/5136)) - By default, the SDK now extracts the organization ID from the DSN (e.g. `o123.ingest.sentry.io`) and compares it with the `sentry-org_id` value in incoming baggage headers. When the two differ, the SDK starts a fresh trace instead of continuing the foreign one. This guards against accidentally linking traces across organizations. diff --git a/sentry-spring-jakarta/api/sentry-spring-jakarta.api b/sentry-spring-jakarta/api/sentry-spring-jakarta.api index fe634da6f4..696d63c756 100644 --- a/sentry-spring-jakarta/api/sentry-spring-jakarta.api +++ b/sentry-spring-jakarta/api/sentry-spring-jakarta.api @@ -244,6 +244,20 @@ public final class io/sentry/spring/jakarta/graphql/SentrySpringSubscriptionHand public fun onSubscriptionResult (Ljava/lang/Object;Lio/sentry/IScopes;Lio/sentry/graphql/ExceptionReporter;Lgraphql/execution/instrumentation/parameters/InstrumentationFieldFetchParameters;)Ljava/lang/Object; } +public final class io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessor : org/springframework/beans/factory/config/BeanPostProcessor, org/springframework/core/PriorityOrdered { + public fun ()V + public fun getOrder ()I + public fun postProcessAfterInitialization (Ljava/lang/Object;Ljava/lang/String;)Ljava/lang/Object; +} + +public final class io/sentry/spring/jakarta/kafka/SentryProducerInterceptor : org/apache/kafka/clients/producer/ProducerInterceptor { + public fun (Lio/sentry/IScopes;)V + public fun close ()V + public fun configure (Ljava/util/Map;)V + public fun onAcknowledgement (Lorg/apache/kafka/clients/producer/RecordMetadata;Ljava/lang/Exception;)V + public fun onSend (Lorg/apache/kafka/clients/producer/ProducerRecord;)Lorg/apache/kafka/clients/producer/ProducerRecord; +} + public class io/sentry/spring/jakarta/opentelemetry/SentryOpenTelemetryAgentWithoutAutoInitConfiguration { public fun ()V public fun sentryOpenTelemetryOptionsConfiguration ()Lio/sentry/Sentry$OptionsConfiguration; diff --git a/sentry-spring-jakarta/build.gradle.kts b/sentry-spring-jakarta/build.gradle.kts index f1920e2451..93367d803f 100644 --- a/sentry-spring-jakarta/build.gradle.kts +++ b/sentry-spring-jakarta/build.gradle.kts @@ -41,6 +41,7 @@ dependencies { compileOnly(libs.servlet.jakarta.api) compileOnly(libs.slf4j.api) compileOnly(libs.springboot3.starter.graphql) + compileOnly(libs.spring.kafka3) compileOnly(libs.springboot3.starter.quartz) compileOnly(Config.Libs.springWebflux) @@ -68,6 +69,7 @@ dependencies { testImplementation(libs.springboot3.starter.aop) testImplementation(libs.springboot3.starter.graphql) testImplementation(libs.springboot3.starter.security) + testImplementation(libs.spring.kafka3) testImplementation(libs.springboot3.starter.test) testImplementation(libs.springboot3.starter.web) testImplementation(libs.springboot3.starter.webflux) diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessor.java new file mode 100644 index 0000000000..6ede82add7 --- /dev/null +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessor.java @@ -0,0 +1,82 @@ +package io.sentry.spring.jakarta.kafka; + +import io.sentry.ScopesAdapter; +import io.sentry.SentryLevel; +import java.lang.reflect.Field; +import org.apache.kafka.clients.producer.ProducerInterceptor; +import org.jetbrains.annotations.ApiStatus; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.core.Ordered; +import org.springframework.core.PriorityOrdered; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.CompositeProducerInterceptor; + +/** + * Sets a {@link SentryProducerInterceptor} on {@link KafkaTemplate} beans via {@link + * KafkaTemplate#setProducerInterceptor(ProducerInterceptor)}. The original bean is not replaced. + * + *

If the template already has a {@link ProducerInterceptor}, both are composed using {@link + * CompositeProducerInterceptor}. Reading the existing interceptor requires reflection (no public + * getter in Spring Kafka 3.x); if reflection fails, a warning is logged and only the Sentry + * interceptor is set. + */ +@ApiStatus.Internal +public final class SentryKafkaProducerBeanPostProcessor + implements BeanPostProcessor, PriorityOrdered { + + @Override + @SuppressWarnings("unchecked") + public @NotNull Object postProcessAfterInitialization( + final @NotNull Object bean, final @NotNull String beanName) throws BeansException { + if (bean instanceof KafkaTemplate) { + final @NotNull KafkaTemplate template = (KafkaTemplate) bean; + final @Nullable ProducerInterceptor existing = getExistingInterceptor(template); + + if (existing instanceof SentryProducerInterceptor) { + return bean; + } + + @SuppressWarnings("rawtypes") + final SentryProducerInterceptor sentryInterceptor = + new SentryProducerInterceptor<>(ScopesAdapter.getInstance()); + + if (existing != null) { + @SuppressWarnings("rawtypes") + final CompositeProducerInterceptor composite = + new CompositeProducerInterceptor(sentryInterceptor, existing); + template.setProducerInterceptor(composite); + } else { + template.setProducerInterceptor(sentryInterceptor); + } + } + return bean; + } + + @SuppressWarnings("unchecked") + private @Nullable ProducerInterceptor getExistingInterceptor( + final @NotNull KafkaTemplate template) { + try { + final @NotNull Field field = KafkaTemplate.class.getDeclaredField("producerInterceptor"); + field.setAccessible(true); + return (ProducerInterceptor) field.get(template); + } catch (NoSuchFieldException | IllegalAccessException e) { + ScopesAdapter.getInstance() + .getOptions() + .getLogger() + .log( + SentryLevel.WARNING, + "Unable to read existing producerInterceptor from KafkaTemplate via reflection. " + + "If you had a custom ProducerInterceptor, it may be overwritten by Sentry's interceptor.", + e); + return null; + } + } + + @Override + public int getOrder() { + return Ordered.LOWEST_PRECEDENCE; + } +} diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryProducerInterceptor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryProducerInterceptor.java new file mode 100644 index 0000000000..916fcceb26 --- /dev/null +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryProducerInterceptor.java @@ -0,0 +1,111 @@ +package io.sentry.spring.jakarta.kafka; + +import io.sentry.BaggageHeader; +import io.sentry.IScopes; +import io.sentry.ISpan; +import io.sentry.SentryTraceHeader; +import io.sentry.SpanDataConvention; +import io.sentry.SpanOptions; +import io.sentry.SpanStatus; +import io.sentry.util.TracingUtils; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import org.apache.kafka.clients.producer.ProducerInterceptor; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.header.Headers; +import org.jetbrains.annotations.ApiStatus; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * A Kafka {@link ProducerInterceptor} that creates {@code queue.publish} spans and injects tracing + * headers into outgoing records. + * + *

The span starts and finishes synchronously in {@link #onSend(ProducerRecord)}, representing + * "message enqueued" semantics. This avoids cross-thread correlation complexity since {@link + * #onAcknowledgement(RecordMetadata, Exception)} runs on the Kafka I/O thread. + * + *

If the customer already has a {@link ProducerInterceptor}, the {@link + * SentryKafkaProducerBeanPostProcessor} composes both using Spring's {@link + * org.springframework.kafka.support.CompositeProducerInterceptor}. + */ +@ApiStatus.Internal +public final class SentryProducerInterceptor implements ProducerInterceptor { + + static final String TRACE_ORIGIN = "auto.queue.spring_jakarta.kafka.producer"; + static final String SENTRY_ENQUEUED_TIME_HEADER = "sentry-task-enqueued-time"; + + private final @NotNull IScopes scopes; + + public SentryProducerInterceptor(final @NotNull IScopes scopes) { + this.scopes = scopes; + } + + @Override + public @NotNull ProducerRecord onSend(final @NotNull ProducerRecord record) { + if (!scopes.getOptions().isEnableQueueTracing()) { + return record; + } + + final @Nullable ISpan activeSpan = scopes.getSpan(); + if (activeSpan == null || activeSpan.isNoOp()) { + return record; + } + + final @NotNull SpanOptions spanOptions = new SpanOptions(); + spanOptions.setOrigin(TRACE_ORIGIN); + final @NotNull ISpan span = activeSpan.startChild("queue.publish", record.topic(), spanOptions); + if (span.isNoOp()) { + return record; + } + + span.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka"); + span.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, record.topic()); + + try { + injectHeaders(record.headers(), span); + } catch (Throwable ignored) { + // Header injection must not break the send + } + + span.setStatus(SpanStatus.OK); + span.finish(); + + return record; + } + + @Override + public void onAcknowledgement( + final @Nullable RecordMetadata metadata, final @Nullable Exception exception) {} + + @Override + public void close() {} + + @Override + public void configure(final @Nullable Map configs) {} + + private void injectHeaders(final @NotNull Headers headers, final @NotNull ISpan span) { + final @Nullable TracingUtils.TracingHeaders tracingHeaders = + TracingUtils.trace(scopes, null, span); + if (tracingHeaders != null) { + final @NotNull SentryTraceHeader sentryTraceHeader = tracingHeaders.getSentryTraceHeader(); + headers.remove(sentryTraceHeader.getName()); + headers.add( + sentryTraceHeader.getName(), + sentryTraceHeader.getValue().getBytes(StandardCharsets.UTF_8)); + + final @Nullable BaggageHeader baggageHeader = tracingHeaders.getBaggageHeader(); + if (baggageHeader != null) { + headers.remove(baggageHeader.getName()); + headers.add( + baggageHeader.getName(), baggageHeader.getValue().getBytes(StandardCharsets.UTF_8)); + } + } + + headers.remove(SENTRY_ENQUEUED_TIME_HEADER); + headers.add( + SENTRY_ENQUEUED_TIME_HEADER, + String.valueOf(System.currentTimeMillis()).getBytes(StandardCharsets.UTF_8)); + } +} diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessorTest.kt new file mode 100644 index 0000000000..25e1d3348e --- /dev/null +++ b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessorTest.kt @@ -0,0 +1,78 @@ +package io.sentry.spring.jakarta.kafka + +import kotlin.test.Test +import kotlin.test.assertSame +import kotlin.test.assertTrue +import org.apache.kafka.clients.producer.ProducerInterceptor +import org.mockito.kotlin.mock +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.core.ProducerFactory +import org.springframework.kafka.support.CompositeProducerInterceptor + +class SentryKafkaProducerBeanPostProcessorTest { + + private fun readInterceptor(template: KafkaTemplate<*, *>): Any? { + val field = KafkaTemplate::class.java.getDeclaredField("producerInterceptor") + field.isAccessible = true + return field.get(template) + } + + @Test + fun `sets SentryProducerInterceptor on KafkaTemplate`() { + val template = KafkaTemplate(mock>()) + val processor = SentryKafkaProducerBeanPostProcessor() + + processor.postProcessAfterInitialization(template, "kafkaTemplate") + + assertTrue(readInterceptor(template) is SentryProducerInterceptor<*, *>) + } + + @Test + fun `does not double-wrap when SentryProducerInterceptor already set`() { + val template = KafkaTemplate(mock>()) + val processor = SentryKafkaProducerBeanPostProcessor() + + processor.postProcessAfterInitialization(template, "kafkaTemplate") + val firstInterceptor = readInterceptor(template) + + processor.postProcessAfterInitialization(template, "kafkaTemplate") + val secondInterceptor = readInterceptor(template) + + assertSame(firstInterceptor, secondInterceptor) + } + + @Test + fun `does not modify non-KafkaTemplate beans`() { + val someBean = "not a kafka template" + val processor = SentryKafkaProducerBeanPostProcessor() + + val result = processor.postProcessAfterInitialization(someBean, "someBean") + + assertSame(someBean, result) + } + + @Test + fun `returns the same bean instance`() { + val template = KafkaTemplate(mock>()) + val processor = SentryKafkaProducerBeanPostProcessor() + + val result = processor.postProcessAfterInitialization(template, "kafkaTemplate") + + assertSame(template, result, "BPP should return the same bean, not a replacement") + } + + @Test + fun `composes with existing customer interceptor using CompositeProducerInterceptor`() { + val template = KafkaTemplate(mock>()) + val customerInterceptor = mock>() + template.setProducerInterceptor(customerInterceptor) + + val processor = SentryKafkaProducerBeanPostProcessor() + processor.postProcessAfterInitialization(template, "kafkaTemplate") + + assertTrue( + readInterceptor(template) is CompositeProducerInterceptor<*, *>, + "Should use CompositeProducerInterceptor when existing interceptor is present", + ) + } +} diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryProducerInterceptorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryProducerInterceptorTest.kt new file mode 100644 index 0000000000..41ca6c2ee5 --- /dev/null +++ b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryProducerInterceptorTest.kt @@ -0,0 +1,142 @@ +package io.sentry.spring.jakarta.kafka + +import io.sentry.IScopes +import io.sentry.Sentry +import io.sentry.SentryOptions +import io.sentry.SentryTraceHeader +import io.sentry.SentryTracer +import io.sentry.TransactionContext +import io.sentry.test.initForTest +import java.nio.charset.StandardCharsets +import kotlin.test.AfterTest +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertNotNull +import kotlin.test.assertSame +import kotlin.test.assertTrue +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.RecordMetadata +import org.apache.kafka.common.TopicPartition +import org.mockito.kotlin.mock +import org.mockito.kotlin.whenever + +class SentryProducerInterceptorTest { + + private lateinit var scopes: IScopes + private lateinit var options: SentryOptions + + @BeforeTest + fun setup() { + initForTest { it.dsn = "https://key@sentry.io/proj" } + scopes = mock() + options = + SentryOptions().apply { + dsn = "https://key@sentry.io/proj" + isEnableQueueTracing = true + } + whenever(scopes.options).thenReturn(options) + } + + @AfterTest + fun teardown() { + Sentry.close() + } + + private fun createTransaction(): SentryTracer { + val tx = SentryTracer(TransactionContext("tx", "op"), scopes) + whenever(scopes.span).thenReturn(tx) + return tx + } + + @Test + fun `creates queue publish span with correct op and data`() { + val tx = createTransaction() + val interceptor = SentryProducerInterceptor(scopes) + val record = ProducerRecord("my-topic", "key", "value") + + interceptor.onSend(record) + + assertEquals(1, tx.spans.size) + val span = tx.spans.first() + assertEquals("queue.publish", span.operation) + assertEquals("my-topic", span.description) + assertEquals("kafka", span.data["messaging.system"]) + assertEquals("my-topic", span.data["messaging.destination.name"]) + assertTrue(span.isFinished) + } + + @Test + fun `does not create span when queue tracing is disabled`() { + val tx = createTransaction() + options.isEnableQueueTracing = false + val interceptor = SentryProducerInterceptor(scopes) + val record = ProducerRecord("my-topic", "key", "value") + + interceptor.onSend(record) + + assertEquals(0, tx.spans.size) + } + + @Test + fun `does not create span when no active span`() { + whenever(scopes.span).thenReturn(null) + val interceptor = SentryProducerInterceptor(scopes) + val record = ProducerRecord("my-topic", "key", "value") + + val result = interceptor.onSend(record) + + assertSame(record, result) + } + + @Test + fun `injects sentry-trace, baggage, and enqueued-time headers`() { + createTransaction() + val interceptor = SentryProducerInterceptor(scopes) + val record = ProducerRecord("my-topic", "key", "value") + + val result = interceptor.onSend(record) + + val resultHeaders = result.headers() + val sentryTraceHeader = resultHeaders.lastHeader(SentryTraceHeader.SENTRY_TRACE_HEADER) + assertNotNull(sentryTraceHeader, "sentry-trace header should be injected") + + val enqueuedTimeHeader = + resultHeaders.lastHeader(SentryProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER) + assertNotNull(enqueuedTimeHeader, "sentry-task-enqueued-time header should be injected") + val enqueuedTime = String(enqueuedTimeHeader.value(), StandardCharsets.UTF_8).toLong() + assertTrue(enqueuedTime > 0, "enqueued time should be a positive epoch millis value") + } + + @Test + fun `span is finished synchronously in onSend`() { + val tx = createTransaction() + val interceptor = SentryProducerInterceptor(scopes) + val record = ProducerRecord("my-topic", "key", "value") + + interceptor.onSend(record) + + assertEquals(1, tx.spans.size) + assertTrue(tx.spans.first().isFinished, "span should be finished after onSend returns") + } + + @Test + fun `onAcknowledgement does not throw`() { + val interceptor = SentryProducerInterceptor(scopes) + val metadata = RecordMetadata(TopicPartition("my-topic", 0), 0, 0, 0, 0, 0) + + interceptor.onAcknowledgement(metadata, null) + } + + @Test + fun `close does not throw`() { + val interceptor = SentryProducerInterceptor(scopes) + + interceptor.close() + } + + @Test + fun `trace origin is set correctly`() { + assertEquals("auto.queue.spring_jakarta.kafka.producer", SentryProducerInterceptor.TRACE_ORIGIN) + } +}