Skip to content

Fix/1702 stream resume run#1858

Open
luoxiao6645 wants to merge 2 commits intobytedance:mainfrom
luoxiao6645:fix/1702-stream-resume-run-id
Open

Fix/1702 stream resume run#1858
luoxiao6645 wants to merge 2 commits intobytedance:mainfrom
luoxiao6645:fix/1702-stream-resume-run-id

Conversation

@luoxiao6645
Copy link
Copy Markdown
Contributor

@luoxiao6645 luoxiao6645 commented Apr 4, 2026

Fixes #1702

Summary

This fixes the internal error triggered when reopening a thread with LangGraph resumable streams enabled.

The main issue was that our gateway returned a Content-Location header that polluted the SDK-cached run_id, which later caused joinStream() to request a non-existent run.
This PR also completes the backend resume path by forwarding Last-Event-ID and replaying buffered events from the in-memory stream bridge.

This branch has been rebased onto the latest upstream/main, and the previous conflicts in memory.py and hooks.ts were resolved locally.

What Changed

  • Return canonical run resource paths in Content-Location
    • backend/app/gateway/routers/thread_runs.py
    • backend/app/gateway/routers/runs.py
  • Forward Last-Event-ID from the request to the stream bridge
    • backend/app/gateway/services.py
  • Replace the in-memory single-consumer queue with a bounded event log that supports replay after Last-Event-ID
    • backend/packages/harness/deerflow/runtime/stream_bridge/memory.py
  • Add replay and buffer-trim coverage for the stream bridge
    • backend/tests/test_stream_bridge.py
  • Sanitize previously polluted cached run ids on the frontend before reconnect
    • frontend/src/core/threads/hooks.ts

User-Visible Effect

  • Reopening a thread no longer reuses a corrupted run_id
  • Resumable stream reconnect now replays retained buffered events instead of only reconnecting live
  • Existing bad sessionStorage values are normalized on read

Validation

Backend:

./backend/.venv/bin/python -m pytest backend/tests/test_stream_bridge.py backend/tests/test_gateway_services.py backend/tests/test_sse_format.py -q                             
./backend/.venv/bin/ruff check \                                                                                                                                                
  backend/app/gateway/routers/thread_runs.py \                                                                                                                                  
  backend/app/gateway/routers/runs.py \                                                                                                                                         
  backend/app/gateway/services.py \                                                                                                                                             
  backend/packages/harness/deerflow/runtime/stream_bridge/memory.py \                                                                                                           
  backend/tests/test_stream_bridge.py                                                                                                                                           
                                                                                                                                                                                
Frontend:                                                                                                                                                                       
In WSL native filesystem:                                                                                                                                                       
                                                                                                                                                                                
./node_modules/.bin/eslint src/core/threads/hooks.ts                                                                                                                            
./node_modules/.bin/tsc --noEmit --pretty false                                                                                                                                 
                                                                                                                                                                                
## Notes                                                                                                                                                                        
                                                                                                                                                                                
- WSL lint/typecheck hangs when the frontend is run from /mnt/d/...; validation was completed from a WSL-native path.                                                           
- stream_resumable is still not a strict runtime feature flag; current behavior is bounded replay by default for the in-memory bridge.   

# Conflicts:
#	backend/packages/harness/deerflow/runtime/stream_bridge/memory.py
#	frontend/src/core/threads/hooks.ts
@luoxiao6645 luoxiao6645 force-pushed the fix/1702-stream-resume-run-id branch from 2c6ab81 to 28ea882 Compare April 4, 2026 16:13
@WillemJiang WillemJiang requested a review from Copilot April 5, 2026 02:59
@WillemJiang
Copy link
Copy Markdown
Collaborator

@luoxiao6645 thanks for your contribution, please take some the fix the lint and unit test errors.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes the internal error when reopening a thread with LangGraph resumable streams by preventing polluted run_id caching and completing the resume/replay path end-to-end.

Changes:

  • Return canonical run resource paths via Content-Location to avoid SDK run-id corruption.
  • Forward Last-Event-ID into the backend stream bridge and add bounded replay via an in-memory event log.
  • Sanitize previously stored/polluted run ids on the frontend before reconnecting.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
frontend/src/core/threads/hooks.ts Adds run-id normalization + a custom storage wrapper to sanitize cached run ids used during reconnect.
backend/app/gateway/routers/thread_runs.py Fixes Content-Location for thread run streaming responses to point to canonical run resource paths.
backend/app/gateway/routers/runs.py Adds Content-Location for stateless stream responses to match canonical run resource paths.
backend/app/gateway/services.py Forwards Last-Event-ID into bridge.subscribe() to enable resumable/replay behavior.
backend/packages/harness/deerflow/runtime/stream_bridge/memory.py Replaces per-run queue with bounded in-memory event log + replay support keyed off Last-Event-ID.
backend/tests/test_stream_bridge.py Updates/adds tests for bounded history and replay behavior (but see review comments for remaining inconsistencies).

}
}

return trimmed.split(/[/?#]/, 1)[0] ?? null;
Copy link

Copilot AI Apr 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

normalizeStoredRunId() can return an empty string for stored values that start with "/" (e.g. "/api/threads/.../runs/") because split(/[/?#]/, 1)[0] yields "". This would cause the wrapper to delete the cache entry and disable reconnect even though a valid run id exists. Consider extracting the last non-empty path segment (or specifically the segment after "/runs/") as a fallback, and treating empty-string results as null.

Suggested change
return trimmed.split(/[/?#]/, 1)[0] ?? null;
const pathWithoutQueryOrHash = trimmed.split(/[?#]/, 1)[0]?.trim() ?? "";
if (!pathWithoutQueryOrHash) {
return null;
}
const runsMarker = "/runs/";
const runsIndex = pathWithoutQueryOrHash.lastIndexOf(runsMarker);
if (runsIndex >= 0) {
const runIdAfterMarker = pathWithoutQueryOrHash
.slice(runsIndex + runsMarker.length)
.split("/", 1)[0]
?.trim();
if (runIdAfterMarker) {
return runIdAfterMarker;
}
}
const segments = pathWithoutQueryOrHash
.split("/")
.map((segment) => segment.trim())
.filter(Boolean);
return segments.at(-1) ?? null;

Copilot uses AI. Check for mistakes.
Comment on lines 62 to 71
@pytest.mark.anyio
async def test_cleanup(bridge: MemoryStreamBridge):
"""After cleanup, the run's queue is removed."""
run_id = "run-cleanup"
await bridge.publish(run_id, "test", {})
assert run_id in bridge._queues
assert run_id in bridge._streams

await bridge.cleanup(run_id)
assert run_id not in bridge._queues
assert run_id not in bridge._streams
assert run_id not in bridge._counters
Copy link

Copilot AI Apr 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test file was partially updated to the new event-log implementation, but several tests further down still reference removed internals (e.g. bridge._queues, bridge._dropped_counts, dropped_total/dropped_count). As-is, the suite will fail against the new MemoryStreamBridge; please update/remove those remaining queue/drop-counter assertions to align with the new buffering semantics.

Copilot uses AI. Check for mistakes.
if last_event_id is not None:
logger.debug("last_event_id=%s accepted but ignored (memory bridge has no replay)", last_event_id)
stream = self._get_or_create_stream(run_id)
next_offset = self._resolve_start_offset(stream, last_event_id)
Copy link

Copilot AI Apr 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subscribe() resolves the initial replay offset by iterating over stream.events in _resolve_start_offset() without holding stream.condition. Since publish() can concurrently append/trim the list and advance start_offset under the condition, this can race and miscompute the starting offset (leading to missed/duplicated replay). Resolve last_event_id while holding the same condition used for mutations (e.g., move _resolve_start_offset inside an async with stream.condition block or add a dedicated lock around events/start_offset reads).

Suggested change
next_offset = self._resolve_start_offset(stream, last_event_id)
async with stream.condition:
next_offset = self._resolve_start_offset(stream, last_event_id)

Copilot uses AI. Check for mistakes.
@@ -62,36 +62,34 @@
@pytest.mark.anyio
async def test_cleanup(bridge: MemoryStreamBridge):
"""After cleanup, the run's queue is removed."""
Copy link

Copilot AI Apr 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstring is now inaccurate: MemoryStreamBridge no longer uses a per-run queue. Update this test description to refer to the run's stream/event log being removed after cleanup.

Suggested change
"""After cleanup, the run's queue is removed."""
"""After cleanup, the run's stream/event log is removed."""

Copilot uses AI. Check for mistakes.
Comment on lines +179 to +195
next_offset = stream.start_offset + 1 # subscriber already consumed e1

await bridge.publish(run_id, "e3", {"step": 3}) # trims e1
await bridge.publish_end(run_id)

received = []
async for entry in bridge.subscribe(
run_id,
last_event_id=stream.events[0].id,
heartbeat_interval=1.0,
):
received.append(entry)
if entry is END_SENTINEL:
break

assert [entry.event for entry in received[:-1]] == ["e3"]
assert next_offset == 1
Copy link

Copilot AI Apr 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_slow_subscriber_does_not_skip_after_buffer_trim() computes next_offset but never uses it to drive subscription logic; the final assert next_offset == 1 is effectively a constant and doesn't validate the intended behavior described in the docstring. Consider asserting against stream.start_offset changes and/or using last_event_id/consumption state to prove the subscriber resumes at the correct absolute offset after trimming.

Suggested change
next_offset = stream.start_offset + 1 # subscriber already consumed e1
await bridge.publish(run_id, "e3", {"step": 3}) # trims e1
await bridge.publish_end(run_id)
received = []
async for entry in bridge.subscribe(
run_id,
last_event_id=stream.events[0].id,
heartbeat_interval=1.0,
):
received.append(entry)
if entry is END_SENTINEL:
break
assert [entry.event for entry in received[:-1]] == ["e3"]
assert next_offset == 1
e1_id = stream.events[0].id # subscriber already consumed e1
assert stream.start_offset == 0
await bridge.publish(run_id, "e3", {"step": 3}) # trims e1
assert stream.start_offset == 1
assert [entry.event for entry in stream.events] == ["e2", "e3"]
resumed_after_e1 = []
async for entry in bridge.subscribe(
run_id,
last_event_id=e1_id,
heartbeat_interval=1.0,
):
resumed_after_e1.append(entry)
break
assert [entry.event for entry in resumed_after_e1] == ["e2"]
e2_id = resumed_after_e1[0].id
await bridge.publish_end(run_id)
resumed_after_e2 = []
async for entry in bridge.subscribe(
run_id,
last_event_id=e2_id,
heartbeat_interval=1.0,
):
resumed_after_e2.append(entry)
if entry is END_SENTINEL:
break
assert [entry.event for entry in resumed_after_e2[:-1]] == ["e3"]
assert resumed_after_e2[-1] is END_SENTINEL

Copilot uses AI. Check for mistakes.
@WillemJiang WillemJiang added the question Further information is requested label Apr 5, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

question Further information is requested

Projects

None yet

Development

Successfully merging this pull request may close these issues.

An internal error occurred

3 participants