From 24cff6df1c2738a9c9dbf1cbfff9fd0da2ec6da2 Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Fri, 10 Apr 2026 13:28:29 +0200 Subject: [PATCH] fix(spring-jakarta): [Queue Instrumentation 13] Align enqueue time with Python Store sentry-task-enqueued-time as epoch seconds and compute receive latency from seconds on the consumer side. This aligns Java Kafka queue instrumentation with sentry-python Celery behavior for cross-SDK interoperability. Co-Authored-By: Claude --- .../kafka/SentryKafkaRecordInterceptor.java | 6 ++++-- .../kafka/SentryProducerInterceptor.java | 4 +++- .../kafka/SentryKafkaRecordInterceptorTest.kt | 17 +++++++++++++++-- .../kafka/SentryProducerInterceptorTest.kt | 4 ++-- 4 files changed, 24 insertions(+), 7 deletions(-) diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java index ad4b87464a..a48a3ab970 100644 --- a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java @@ -1,6 +1,7 @@ package io.sentry.spring.jakarta.kafka; import io.sentry.BaggageHeader; +import io.sentry.DateUtils; import io.sentry.IScopes; import io.sentry.ISentryLifecycleToken; import io.sentry.ITransaction; @@ -172,8 +173,9 @@ private boolean isIgnored() { headerValue(record, SentryProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER); if (enqueuedTimeStr != null) { try { - final long enqueuedTime = Long.parseLong(enqueuedTimeStr); - final long latencyMs = System.currentTimeMillis() - enqueuedTime; + final double enqueuedTimeSeconds = Double.parseDouble(enqueuedTimeStr); + final double nowSeconds = DateUtils.millisToSeconds(System.currentTimeMillis()); + final long latencyMs = (long) ((nowSeconds - enqueuedTimeSeconds) * 1000); if (latencyMs >= 0) { transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_RECEIVE_LATENCY, latencyMs); } diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryProducerInterceptor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryProducerInterceptor.java index 4caa4a8b54..7e589511c4 100644 --- a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryProducerInterceptor.java +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryProducerInterceptor.java @@ -1,6 +1,7 @@ package io.sentry.spring.jakarta.kafka; import io.sentry.BaggageHeader; +import io.sentry.DateUtils; import io.sentry.IScopes; import io.sentry.ISpan; import io.sentry.SentryTraceHeader; @@ -107,6 +108,7 @@ private void injectHeaders(final @NotNull Headers headers, final @NotNull ISpan headers.remove(SENTRY_ENQUEUED_TIME_HEADER); headers.add( SENTRY_ENQUEUED_TIME_HEADER, - String.valueOf(System.currentTimeMillis()).getBytes(StandardCharsets.UTF_8)); + String.valueOf(DateUtils.millisToSeconds(System.currentTimeMillis())) + .getBytes(StandardCharsets.UTF_8)); } } diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt index 57aef26bc0..15bbb6a293 100644 --- a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt +++ b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt @@ -17,6 +17,7 @@ import kotlin.test.BeforeTest import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertNull +import kotlin.test.assertTrue import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.header.internals.RecordHeaders @@ -86,7 +87,7 @@ class SentryKafkaRecordInterceptorTest { private fun createRecordWithHeaders( sentryTrace: String? = null, baggage: String? = null, - enqueuedTime: Long? = null, + enqueuedTime: String? = null, deliveryAttempt: Int? = null, ): ConsumerRecord { val headers = RecordHeaders() @@ -99,7 +100,7 @@ class SentryKafkaRecordInterceptorTest { enqueuedTime?.let { headers.add( SentryProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER, - it.toString().toByteArray(StandardCharsets.UTF_8), + it.toByteArray(StandardCharsets.UTF_8), ) } deliveryAttempt?.let { @@ -165,6 +166,18 @@ class SentryKafkaRecordInterceptorTest { assertNull(transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT)) } + @Test + fun `sets receive latency from enqueued time in epoch seconds`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val enqueuedTime = (System.currentTimeMillis() / 1000.0 - 1.0).toString() + val record = createRecordWithHeaders(enqueuedTime = enqueuedTime) + + withMockSentry { interceptor.intercept(record, consumer) } + + val latency = transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_RECEIVE_LATENCY) + assertTrue(latency is Long && latency >= 0) + } + @Test fun `does not create span when queue tracing is disabled`() { options.isEnableQueueTracing = false diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryProducerInterceptorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryProducerInterceptorTest.kt index 41ca6c2ee5..f877b1e7d2 100644 --- a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryProducerInterceptorTest.kt +++ b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryProducerInterceptorTest.kt @@ -104,8 +104,8 @@ class SentryProducerInterceptorTest { val enqueuedTimeHeader = resultHeaders.lastHeader(SentryProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER) assertNotNull(enqueuedTimeHeader, "sentry-task-enqueued-time header should be injected") - val enqueuedTime = String(enqueuedTimeHeader.value(), StandardCharsets.UTF_8).toLong() - assertTrue(enqueuedTime > 0, "enqueued time should be a positive epoch millis value") + val enqueuedTime = String(enqueuedTimeHeader.value(), StandardCharsets.UTF_8).toDouble() + assertTrue(enqueuedTime > 0, "enqueued time should be a positive epoch seconds value") } @Test