diff --git a/sentry-spring-jakarta/api/sentry-spring-jakarta.api b/sentry-spring-jakarta/api/sentry-spring-jakarta.api index 57d46f05bc..0ba6c77725 100644 --- a/sentry-spring-jakarta/api/sentry-spring-jakarta.api +++ b/sentry-spring-jakarta/api/sentry-spring-jakarta.api @@ -260,6 +260,7 @@ public final class io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor : public fun (Lio/sentry/IScopes;)V public fun (Lio/sentry/IScopes;Lorg/springframework/kafka/listener/RecordInterceptor;)V public fun afterRecord (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)V + public fun clearThreadState (Lorg/apache/kafka/clients/consumer/Consumer;)V public fun failure (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Ljava/lang/Exception;Lorg/apache/kafka/clients/consumer/Consumer;)V public fun intercept (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)Lorg/apache/kafka/clients/consumer/ConsumerRecord; public fun success (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)V diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java index e7b13f08dc..d11f7f8a67 100644 --- a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java @@ -52,6 +52,8 @@ public SentryKafkaRecordInterceptor( return delegateIntercept(record, consumer); } + finishStaleContext(); + final @NotNull IScopes forkedScopes = scopes.forkedScopes("SentryKafkaRecordInterceptor"); final @NotNull ISentryLifecycleToken lifecycleToken = forkedScopes.makeCurrent(); @@ -98,6 +100,11 @@ public void afterRecord( } } + @Override + public void clearThreadState(final @NotNull Consumer consumer) { + finishStaleContext(); + } + private @Nullable ConsumerRecord delegateIntercept( final @NotNull ConsumerRecord record, final @NotNull Consumer consumer) { if (delegate != null) { @@ -165,6 +172,12 @@ public void afterRecord( return transaction; } + private void finishStaleContext() { + if (currentContext.get() != null) { + finishSpan(SpanStatus.UNKNOWN, null); + } + } + private void finishSpan(final @NotNull SpanStatus status, final @Nullable Throwable throwable) { final @Nullable SentryRecordContext ctx = currentContext.get(); if (ctx == null) { diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt index 370da75585..0688af70db 100644 --- a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt +++ b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt @@ -208,4 +208,64 @@ class SentryKafkaRecordInterceptorTest { SentryKafkaRecordInterceptor.TRACE_ORIGIN, ) } + + @Test + fun `clearThreadState cleans up stale context`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord() + + // intercept sets up context in ThreadLocal + interceptor.intercept(record, consumer) + + // clearThreadState should clean up without success/failure being called + interceptor.clearThreadState(consumer) + + // lifecycle token should have been closed + verify(lifecycleToken).close() + } + + @Test + fun `clearThreadState is no-op when no context exists`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + + // should not throw + interceptor.clearThreadState(consumer) + } + + @Test + fun `intercept cleans up stale context from previous record`() { + val lifecycleToken2 = mock() + val forkedScopes2 = mock() + whenever(forkedScopes2.options).thenReturn(options) + whenever(forkedScopes2.makeCurrent()).thenReturn(lifecycleToken2) + val tx2 = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes2) + whenever(forkedScopes2.startTransaction(any(), any())).thenReturn(tx2) + + var callCount = 0 + whenever(scopes.forkedScopes(any())).thenAnswer { + callCount++ + if (callCount == 1) { + val forkedScopes1 = mock() + whenever(forkedScopes1.options).thenReturn(options) + whenever(forkedScopes1.makeCurrent()).thenReturn(lifecycleToken) + val tx1 = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes1) + whenever(forkedScopes1.startTransaction(any(), any())).thenReturn(tx1) + forkedScopes1 + } else { + forkedScopes2 + } + } + + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord() + + // First intercept sets up context + interceptor.intercept(record, consumer) + + // Second intercept without success/failure — should clean up stale context first + interceptor.intercept(record, consumer) + + // First lifecycle token should have been closed by the defensive cleanup + verify(lifecycleToken).close() + } }