fix (tui): preserve active stream during concurrent run finals

This commit is contained in:
Vignesh Natarajan
2026-02-14 18:24:58 -08:00
parent a7eb0dd9a5
commit 61228639c2
2 changed files with 98 additions and 35 deletions
+45
View File
@@ -364,6 +364,51 @@ describe("tui-event-handlers: handleAgentEvent", () => {
expect(loadHistory).toHaveBeenCalledTimes(1); expect(loadHistory).toHaveBeenCalledTimes(1);
}); });
it("does not reload history or clear active run when another run final arrives mid-stream", () => {
const state = makeState({ activeChatRunId: "run-active" });
const { chatLog, tui, setActivityStatus, loadHistory, isLocalRunId, forgetLocalRunId } =
makeContext(state);
const { handleChatEvent } = createEventHandlers({
chatLog,
tui,
state,
setActivityStatus,
loadHistory,
isLocalRunId,
forgetLocalRunId,
});
handleChatEvent({
runId: "run-active",
sessionKey: state.currentSessionKey,
state: "delta",
message: { content: "partial" },
});
loadHistory.mockClear();
setActivityStatus.mockClear();
handleChatEvent({
runId: "run-other",
sessionKey: state.currentSessionKey,
state: "final",
message: { content: [{ type: "text", text: "other final" }] },
});
expect(loadHistory).not.toHaveBeenCalled();
expect(state.activeChatRunId).toBe("run-active");
expect(setActivityStatus).not.toHaveBeenCalledWith("idle");
handleChatEvent({
runId: "run-active",
sessionKey: state.currentSessionKey,
state: "delta",
message: { content: "continued" },
});
expect(chatLog.updateAssistant).toHaveBeenLastCalledWith("continued", "run-active");
});
it("drops streaming assistant when chat final has no message", () => { it("drops streaming assistant when chat final has no message", () => {
const state = makeState({ activeChatRunId: null }); const state = makeState({ activeChatRunId: null });
const { chatLog, tui, setActivityStatus } = makeContext(state); const { chatLog, tui, setActivityStatus } = makeContext(state);
+48 -30
View File
@@ -79,6 +79,31 @@ export function createEventHandlers(context: EventHandlerContext) {
pruneRunMap(finalizedRuns); pruneRunMap(finalizedRuns);
}; };
const clearActiveRunIfMatch = (runId: string) => {
if (state.activeChatRunId === runId) {
state.activeChatRunId = null;
}
};
const hasConcurrentActiveRun = (runId: string) => {
const activeRunId = state.activeChatRunId;
if (!activeRunId || activeRunId === runId) {
return false;
}
return sessionRuns.has(activeRunId);
};
const maybeRefreshHistoryForRun = (runId: string) => {
if (isLocalRunId?.(runId)) {
forgetLocalRunId?.(runId);
return;
}
if (hasConcurrentActiveRun(runId)) {
return;
}
void loadHistory?.();
};
const handleChatEvent = (payload: unknown) => { const handleChatEvent = (payload: unknown) => {
if (!payload || typeof payload !== "object") { if (!payload || typeof payload !== "object") {
return; return;
@@ -109,43 +134,36 @@ export function createEventHandlers(context: EventHandlerContext) {
setActivityStatus("streaming"); setActivityStatus("streaming");
} }
if (evt.state === "final") { if (evt.state === "final") {
const wasActiveRun = state.activeChatRunId === evt.runId;
if (!evt.message) { if (!evt.message) {
if (isLocalRunId?.(evt.runId)) { maybeRefreshHistoryForRun(evt.runId);
forgetLocalRunId?.(evt.runId);
} else {
void loadHistory?.();
}
chatLog.dropAssistant(evt.runId); chatLog.dropAssistant(evt.runId);
noteFinalizedRun(evt.runId); noteFinalizedRun(evt.runId);
state.activeChatRunId = null; clearActiveRunIfMatch(evt.runId);
if (wasActiveRun) {
setActivityStatus("idle"); setActivityStatus("idle");
}
void refreshSessionInfo?.(); void refreshSessionInfo?.();
tui.requestRender(); tui.requestRender();
return; return;
} }
if (isCommandMessage(evt.message)) { if (isCommandMessage(evt.message)) {
if (isLocalRunId?.(evt.runId)) { maybeRefreshHistoryForRun(evt.runId);
forgetLocalRunId?.(evt.runId);
} else {
void loadHistory?.();
}
const text = extractTextFromMessage(evt.message); const text = extractTextFromMessage(evt.message);
if (text) { if (text) {
chatLog.addSystem(text); chatLog.addSystem(text);
} }
streamAssembler.drop(evt.runId); streamAssembler.drop(evt.runId);
noteFinalizedRun(evt.runId); noteFinalizedRun(evt.runId);
state.activeChatRunId = null; clearActiveRunIfMatch(evt.runId);
if (wasActiveRun) {
setActivityStatus("idle"); setActivityStatus("idle");
}
void refreshSessionInfo?.(); void refreshSessionInfo?.();
tui.requestRender(); tui.requestRender();
return; return;
} }
if (isLocalRunId?.(evt.runId)) { maybeRefreshHistoryForRun(evt.runId);
forgetLocalRunId?.(evt.runId);
} else {
void loadHistory?.();
}
const stopReason = const stopReason =
evt.message && typeof evt.message === "object" && !Array.isArray(evt.message) evt.message && typeof evt.message === "object" && !Array.isArray(evt.message)
? typeof (evt.message as Record<string, unknown>).stopReason === "string" ? typeof (evt.message as Record<string, unknown>).stopReason === "string"
@@ -156,36 +174,36 @@ export function createEventHandlers(context: EventHandlerContext) {
const finalText = streamAssembler.finalize(evt.runId, evt.message, state.showThinking); const finalText = streamAssembler.finalize(evt.runId, evt.message, state.showThinking);
chatLog.finalizeAssistant(finalText, evt.runId); chatLog.finalizeAssistant(finalText, evt.runId);
noteFinalizedRun(evt.runId); noteFinalizedRun(evt.runId);
state.activeChatRunId = null; clearActiveRunIfMatch(evt.runId);
if (wasActiveRun) {
setActivityStatus(stopReason === "error" ? "error" : "idle"); setActivityStatus(stopReason === "error" ? "error" : "idle");
}
// Refresh session info to update token counts in footer // Refresh session info to update token counts in footer
void refreshSessionInfo?.(); void refreshSessionInfo?.();
} }
if (evt.state === "aborted") { if (evt.state === "aborted") {
const wasActiveRun = state.activeChatRunId === evt.runId;
chatLog.addSystem("run aborted"); chatLog.addSystem("run aborted");
streamAssembler.drop(evt.runId); streamAssembler.drop(evt.runId);
sessionRuns.delete(evt.runId); sessionRuns.delete(evt.runId);
state.activeChatRunId = null; clearActiveRunIfMatch(evt.runId);
if (wasActiveRun) {
setActivityStatus("aborted"); setActivityStatus("aborted");
void refreshSessionInfo?.();
if (isLocalRunId?.(evt.runId)) {
forgetLocalRunId?.(evt.runId);
} else {
void loadHistory?.();
} }
void refreshSessionInfo?.();
maybeRefreshHistoryForRun(evt.runId);
} }
if (evt.state === "error") { if (evt.state === "error") {
const wasActiveRun = state.activeChatRunId === evt.runId;
chatLog.addSystem(`run error: ${evt.errorMessage ?? "unknown"}`); chatLog.addSystem(`run error: ${evt.errorMessage ?? "unknown"}`);
streamAssembler.drop(evt.runId); streamAssembler.drop(evt.runId);
sessionRuns.delete(evt.runId); sessionRuns.delete(evt.runId);
state.activeChatRunId = null; clearActiveRunIfMatch(evt.runId);
if (wasActiveRun) {
setActivityStatus("error"); setActivityStatus("error");
void refreshSessionInfo?.();
if (isLocalRunId?.(evt.runId)) {
forgetLocalRunId?.(evt.runId);
} else {
void loadHistory?.();
} }
void refreshSessionInfo?.();
maybeRefreshHistoryForRun(evt.runId);
} }
tui.requestRender(); tui.requestRender();
}; };