Conversation
# Conflicts: # backend/packages/harness/deerflow/runtime/stream_bridge/memory.py # frontend/src/core/threads/hooks.ts
2c6ab81 to
28ea882
Compare
|
@luoxiao6645 thanks for your contribution, please take some the fix the lint and unit test errors. |
There was a problem hiding this comment.
Pull request overview
Fixes the internal error when reopening a thread with LangGraph resumable streams by preventing polluted run_id caching and completing the resume/replay path end-to-end.
Changes:
- Return canonical run resource paths via
Content-Locationto avoid SDK run-id corruption. - Forward
Last-Event-IDinto the backend stream bridge and add bounded replay via an in-memory event log. - Sanitize previously stored/polluted run ids on the frontend before reconnecting.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| frontend/src/core/threads/hooks.ts | Adds run-id normalization + a custom storage wrapper to sanitize cached run ids used during reconnect. |
| backend/app/gateway/routers/thread_runs.py | Fixes Content-Location for thread run streaming responses to point to canonical run resource paths. |
| backend/app/gateway/routers/runs.py | Adds Content-Location for stateless stream responses to match canonical run resource paths. |
| backend/app/gateway/services.py | Forwards Last-Event-ID into bridge.subscribe() to enable resumable/replay behavior. |
| backend/packages/harness/deerflow/runtime/stream_bridge/memory.py | Replaces per-run queue with bounded in-memory event log + replay support keyed off Last-Event-ID. |
| backend/tests/test_stream_bridge.py | Updates/adds tests for bounded history and replay behavior (but see review comments for remaining inconsistencies). |
| } | ||
| } | ||
|
|
||
| return trimmed.split(/[/?#]/, 1)[0] ?? null; |
There was a problem hiding this comment.
normalizeStoredRunId() can return an empty string for stored values that start with "/" (e.g. "/api/threads/.../runs/") because split(/[/?#]/, 1)[0] yields "". This would cause the wrapper to delete the cache entry and disable reconnect even though a valid run id exists. Consider extracting the last non-empty path segment (or specifically the segment after "/runs/") as a fallback, and treating empty-string results as null.
| return trimmed.split(/[/?#]/, 1)[0] ?? null; | |
| const pathWithoutQueryOrHash = trimmed.split(/[?#]/, 1)[0]?.trim() ?? ""; | |
| if (!pathWithoutQueryOrHash) { | |
| return null; | |
| } | |
| const runsMarker = "/runs/"; | |
| const runsIndex = pathWithoutQueryOrHash.lastIndexOf(runsMarker); | |
| if (runsIndex >= 0) { | |
| const runIdAfterMarker = pathWithoutQueryOrHash | |
| .slice(runsIndex + runsMarker.length) | |
| .split("/", 1)[0] | |
| ?.trim(); | |
| if (runIdAfterMarker) { | |
| return runIdAfterMarker; | |
| } | |
| } | |
| const segments = pathWithoutQueryOrHash | |
| .split("/") | |
| .map((segment) => segment.trim()) | |
| .filter(Boolean); | |
| return segments.at(-1) ?? null; |
| @pytest.mark.anyio | ||
| async def test_cleanup(bridge: MemoryStreamBridge): | ||
| """After cleanup, the run's queue is removed.""" | ||
| run_id = "run-cleanup" | ||
| await bridge.publish(run_id, "test", {}) | ||
| assert run_id in bridge._queues | ||
| assert run_id in bridge._streams | ||
|
|
||
| await bridge.cleanup(run_id) | ||
| assert run_id not in bridge._queues | ||
| assert run_id not in bridge._streams | ||
| assert run_id not in bridge._counters |
There was a problem hiding this comment.
This test file was partially updated to the new event-log implementation, but several tests further down still reference removed internals (e.g. bridge._queues, bridge._dropped_counts, dropped_total/dropped_count). As-is, the suite will fail against the new MemoryStreamBridge; please update/remove those remaining queue/drop-counter assertions to align with the new buffering semantics.
| if last_event_id is not None: | ||
| logger.debug("last_event_id=%s accepted but ignored (memory bridge has no replay)", last_event_id) | ||
| stream = self._get_or_create_stream(run_id) | ||
| next_offset = self._resolve_start_offset(stream, last_event_id) |
There was a problem hiding this comment.
subscribe() resolves the initial replay offset by iterating over stream.events in _resolve_start_offset() without holding stream.condition. Since publish() can concurrently append/trim the list and advance start_offset under the condition, this can race and miscompute the starting offset (leading to missed/duplicated replay). Resolve last_event_id while holding the same condition used for mutations (e.g., move _resolve_start_offset inside an async with stream.condition block or add a dedicated lock around events/start_offset reads).
| next_offset = self._resolve_start_offset(stream, last_event_id) | |
| async with stream.condition: | |
| next_offset = self._resolve_start_offset(stream, last_event_id) |
| @@ -62,36 +62,34 @@ | |||
| @pytest.mark.anyio | |||
| async def test_cleanup(bridge: MemoryStreamBridge): | |||
| """After cleanup, the run's queue is removed.""" | |||
There was a problem hiding this comment.
Docstring is now inaccurate: MemoryStreamBridge no longer uses a per-run queue. Update this test description to refer to the run's stream/event log being removed after cleanup.
| """After cleanup, the run's queue is removed.""" | |
| """After cleanup, the run's stream/event log is removed.""" |
| next_offset = stream.start_offset + 1 # subscriber already consumed e1 | ||
|
|
||
| await bridge.publish(run_id, "e3", {"step": 3}) # trims e1 | ||
| await bridge.publish_end(run_id) | ||
|
|
||
| received = [] | ||
| async for entry in bridge.subscribe( | ||
| run_id, | ||
| last_event_id=stream.events[0].id, | ||
| heartbeat_interval=1.0, | ||
| ): | ||
| received.append(entry) | ||
| if entry is END_SENTINEL: | ||
| break | ||
|
|
||
| assert [entry.event for entry in received[:-1]] == ["e3"] | ||
| assert next_offset == 1 |
There was a problem hiding this comment.
test_slow_subscriber_does_not_skip_after_buffer_trim() computes next_offset but never uses it to drive subscription logic; the final assert next_offset == 1 is effectively a constant and doesn't validate the intended behavior described in the docstring. Consider asserting against stream.start_offset changes and/or using last_event_id/consumption state to prove the subscriber resumes at the correct absolute offset after trimming.
| next_offset = stream.start_offset + 1 # subscriber already consumed e1 | |
| await bridge.publish(run_id, "e3", {"step": 3}) # trims e1 | |
| await bridge.publish_end(run_id) | |
| received = [] | |
| async for entry in bridge.subscribe( | |
| run_id, | |
| last_event_id=stream.events[0].id, | |
| heartbeat_interval=1.0, | |
| ): | |
| received.append(entry) | |
| if entry is END_SENTINEL: | |
| break | |
| assert [entry.event for entry in received[:-1]] == ["e3"] | |
| assert next_offset == 1 | |
| e1_id = stream.events[0].id # subscriber already consumed e1 | |
| assert stream.start_offset == 0 | |
| await bridge.publish(run_id, "e3", {"step": 3}) # trims e1 | |
| assert stream.start_offset == 1 | |
| assert [entry.event for entry in stream.events] == ["e2", "e3"] | |
| resumed_after_e1 = [] | |
| async for entry in bridge.subscribe( | |
| run_id, | |
| last_event_id=e1_id, | |
| heartbeat_interval=1.0, | |
| ): | |
| resumed_after_e1.append(entry) | |
| break | |
| assert [entry.event for entry in resumed_after_e1] == ["e2"] | |
| e2_id = resumed_after_e1[0].id | |
| await bridge.publish_end(run_id) | |
| resumed_after_e2 = [] | |
| async for entry in bridge.subscribe( | |
| run_id, | |
| last_event_id=e2_id, | |
| heartbeat_interval=1.0, | |
| ): | |
| resumed_after_e2.append(entry) | |
| if entry is END_SENTINEL: | |
| break | |
| assert [entry.event for entry in resumed_after_e2[:-1]] == ["e3"] | |
| assert resumed_after_e2[-1] is END_SENTINEL |
Fixes #1702
Summary
This fixes the internal error triggered when reopening a thread with LangGraph resumable streams enabled.
The main issue was that our gateway returned a
Content-Locationheader that polluted the SDK-cachedrun_id, which later causedjoinStream()to request a non-existent run.This PR also completes the backend resume path by forwarding
Last-Event-IDand replaying buffered events from the in-memory stream bridge.This branch has been rebased onto the latest
upstream/main, and the previous conflicts inmemory.pyandhooks.tswere resolved locally.What Changed
Content-Locationbackend/app/gateway/routers/thread_runs.pybackend/app/gateway/routers/runs.pyLast-Event-IDfrom the request to the stream bridgebackend/app/gateway/services.pyLast-Event-IDbackend/packages/harness/deerflow/runtime/stream_bridge/memory.pybackend/tests/test_stream_bridge.pyfrontend/src/core/threads/hooks.tsUser-Visible Effect
run_idsessionStoragevalues are normalized on readValidation
Backend: