diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java index e07f86fa26..ad4b87464a 100644 --- a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java @@ -11,6 +11,7 @@ import io.sentry.TransactionContext; import io.sentry.TransactionOptions; import io.sentry.util.SpanUtils; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.List; @@ -21,6 +22,7 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.springframework.kafka.listener.RecordInterceptor; +import org.springframework.kafka.support.KafkaHeaders; /** * A {@link RecordInterceptor} that creates {@code queue.process} transactions for incoming Kafka @@ -161,6 +163,11 @@ private boolean isIgnored() { transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_ID, messageId); } + final @Nullable Integer retryCount = retryCount(record); + if (retryCount != null) { + transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT, retryCount); + } + final @Nullable String enqueuedTimeStr = headerValue(record, SentryProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER); if (enqueuedTimeStr != null) { @@ -178,6 +185,25 @@ private boolean isIgnored() { return transaction; } + private @Nullable Integer retryCount(final @NotNull ConsumerRecord record) { + final @Nullable Header header = record.headers().lastHeader(KafkaHeaders.DELIVERY_ATTEMPT); + if (header == null) { + return null; + } + + final byte[] value = header.value(); + if (value == null || value.length != Integer.BYTES) { + return null; + } + + final int attempt = ByteBuffer.wrap(value).getInt(); + if (attempt <= 0) { + return null; + } + + return attempt - 1; + } + private void finishStaleContext() { if (currentContext.get() != null) { finishSpan(SpanStatus.UNKNOWN, null); diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt index 206a43298e..57aef26bc0 100644 --- a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt +++ b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt @@ -7,13 +7,16 @@ import io.sentry.Sentry import io.sentry.SentryOptions import io.sentry.SentryTraceHeader import io.sentry.SentryTracer +import io.sentry.SpanDataConvention import io.sentry.TransactionContext import io.sentry.test.initForTest +import java.nio.ByteBuffer import java.nio.charset.StandardCharsets import kotlin.test.AfterTest import kotlin.test.BeforeTest import kotlin.test.Test import kotlin.test.assertEquals +import kotlin.test.assertNull import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.header.internals.RecordHeaders @@ -24,6 +27,7 @@ import org.mockito.kotlin.never import org.mockito.kotlin.verify import org.mockito.kotlin.whenever import org.springframework.kafka.listener.RecordInterceptor +import org.springframework.kafka.support.KafkaHeaders class SentryKafkaRecordInterceptorTest { @@ -32,6 +36,7 @@ class SentryKafkaRecordInterceptorTest { private lateinit var options: SentryOptions private lateinit var consumer: Consumer private lateinit var lifecycleToken: ISentryLifecycleToken + private lateinit var transaction: SentryTracer @BeforeTest fun setup() { @@ -52,8 +57,9 @@ class SentryKafkaRecordInterceptorTest { whenever(forkedScopes.options).thenReturn(options) whenever(forkedScopes.makeCurrent()).thenReturn(lifecycleToken) - val tx = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes) - whenever(forkedScopes.startTransaction(any(), any())).thenReturn(tx) + transaction = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes) + whenever(forkedScopes.startTransaction(any(), any())) + .thenReturn(transaction) } @AfterTest @@ -81,6 +87,7 @@ class SentryKafkaRecordInterceptorTest { sentryTrace: String? = null, baggage: String? = null, enqueuedTime: Long? = null, + deliveryAttempt: Int? = null, ): ConsumerRecord { val headers = RecordHeaders() sentryTrace?.let { @@ -95,6 +102,12 @@ class SentryKafkaRecordInterceptorTest { it.toString().toByteArray(StandardCharsets.UTF_8), ) } + deliveryAttempt?.let { + headers.add( + KafkaHeaders.DELIVERY_ATTEMPT, + ByteBuffer.allocate(Int.SIZE_BYTES).putInt(it).array(), + ) + } val record = ConsumerRecord("my-topic", 0, 0L, "key", "value") headers.forEach { record.headers().add(it) } return record @@ -132,6 +145,26 @@ class SentryKafkaRecordInterceptorTest { verify(forkedScopes).continueTrace(org.mockito.kotlin.isNull(), org.mockito.kotlin.isNull()) } + @Test + fun `sets retry count from delivery attempt header`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecordWithHeaders(deliveryAttempt = 3) + + withMockSentry { interceptor.intercept(record, consumer) } + + assertEquals(2, transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT)) + } + + @Test + fun `does not set retry count when delivery attempt header is missing`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord() + + withMockSentry { interceptor.intercept(record, consumer) } + + assertNull(transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT)) + } + @Test fun `does not create span when queue tracing is disabled`() { options.isEnableQueueTracing = false