mirror of
https://github.com/farcasclaudiu/openclaw.git
synced 2026-06-29 03:01:50 +03:00
fix(gateway): drain active turns before restart to prevent message loss (#13931)
* fix(gateway): drain active turns before restart to prevent message loss On SIGUSR1 restart, the gateway now waits up to 30s for in-flight agent turns to complete before tearing down the server. This prevents buffered messages from being dropped when config.patch or update triggers a restart while agents are mid-turn. Changes: - command-queue.ts: add getActiveTaskCount() and waitForActiveTasks() helpers to track and wait on active lane tasks - run-loop.ts: on restart signal, drain active tasks before server.close() with a 30s timeout; extend force-exit timer accordingly - command-queue.test.ts: update imports for new exports Fixes #13883 * fix(queue): snapshot active tasks for restart drain --------- Co-authored-by: Elonito <0xRaini@users.noreply.github.com> Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
This commit is contained in:
@@ -6,6 +6,7 @@ import {
|
|||||||
isGatewaySigusr1RestartExternallyAllowed,
|
isGatewaySigusr1RestartExternallyAllowed,
|
||||||
} from "../../infra/restart.js";
|
} from "../../infra/restart.js";
|
||||||
import { createSubsystemLogger } from "../../logging/subsystem.js";
|
import { createSubsystemLogger } from "../../logging/subsystem.js";
|
||||||
|
import { getActiveTaskCount, waitForActiveTasks } from "../../process/command-queue.js";
|
||||||
|
|
||||||
const gatewayLog = createSubsystemLogger("gateway");
|
const gatewayLog = createSubsystemLogger("gateway");
|
||||||
|
|
||||||
@@ -26,6 +27,9 @@ export async function runGatewayLoop(params: {
|
|||||||
process.removeListener("SIGUSR1", onSigusr1);
|
process.removeListener("SIGUSR1", onSigusr1);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const DRAIN_TIMEOUT_MS = 30_000;
|
||||||
|
const SHUTDOWN_TIMEOUT_MS = 5_000;
|
||||||
|
|
||||||
const request = (action: GatewayRunSignalAction, signal: string) => {
|
const request = (action: GatewayRunSignalAction, signal: string) => {
|
||||||
if (shuttingDown) {
|
if (shuttingDown) {
|
||||||
gatewayLog.info(`received ${signal} during shutdown; ignoring`);
|
gatewayLog.info(`received ${signal} during shutdown; ignoring`);
|
||||||
@@ -35,14 +39,33 @@ export async function runGatewayLoop(params: {
|
|||||||
const isRestart = action === "restart";
|
const isRestart = action === "restart";
|
||||||
gatewayLog.info(`received ${signal}; ${isRestart ? "restarting" : "shutting down"}`);
|
gatewayLog.info(`received ${signal}; ${isRestart ? "restarting" : "shutting down"}`);
|
||||||
|
|
||||||
|
// Allow extra time for draining active turns on restart.
|
||||||
|
const forceExitMs = isRestart ? DRAIN_TIMEOUT_MS + SHUTDOWN_TIMEOUT_MS : SHUTDOWN_TIMEOUT_MS;
|
||||||
const forceExitTimer = setTimeout(() => {
|
const forceExitTimer = setTimeout(() => {
|
||||||
gatewayLog.error("shutdown timed out; exiting without full cleanup");
|
gatewayLog.error("shutdown timed out; exiting without full cleanup");
|
||||||
cleanupSignals();
|
cleanupSignals();
|
||||||
params.runtime.exit(0);
|
params.runtime.exit(0);
|
||||||
}, 5000);
|
}, forceExitMs);
|
||||||
|
|
||||||
void (async () => {
|
void (async () => {
|
||||||
try {
|
try {
|
||||||
|
// On restart, wait for in-flight agent turns to finish before
|
||||||
|
// tearing down the server so buffered messages are delivered.
|
||||||
|
if (isRestart) {
|
||||||
|
const activeTasks = getActiveTaskCount();
|
||||||
|
if (activeTasks > 0) {
|
||||||
|
gatewayLog.info(
|
||||||
|
`draining ${activeTasks} active task(s) before restart (timeout ${DRAIN_TIMEOUT_MS}ms)`,
|
||||||
|
);
|
||||||
|
const { drained } = await waitForActiveTasks(DRAIN_TIMEOUT_MS);
|
||||||
|
if (drained) {
|
||||||
|
gatewayLog.info("all active tasks drained");
|
||||||
|
} else {
|
||||||
|
gatewayLog.warn("drain timeout reached; proceeding with restart");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
await server?.close({
|
await server?.close({
|
||||||
reason: isRestart ? "gateway restarting" : "gateway stopping",
|
reason: isRestart ? "gateway restarting" : "gateway stopping",
|
||||||
restartExpectedMs: isRestart ? 1500 : null,
|
restartExpectedMs: isRestart ? 1500 : null,
|
||||||
|
|||||||
@@ -16,7 +16,14 @@ vi.mock("../logging/diagnostic.js", () => ({
|
|||||||
diagnosticLogger: diagnosticMocks.diag,
|
diagnosticLogger: diagnosticMocks.diag,
|
||||||
}));
|
}));
|
||||||
|
|
||||||
import { enqueueCommand, getQueueSize } from "./command-queue.js";
|
import {
|
||||||
|
enqueueCommand,
|
||||||
|
enqueueCommandInLane,
|
||||||
|
getActiveTaskCount,
|
||||||
|
getQueueSize,
|
||||||
|
setCommandLaneConcurrency,
|
||||||
|
waitForActiveTasks,
|
||||||
|
} from "./command-queue.js";
|
||||||
|
|
||||||
describe("command queue", () => {
|
describe("command queue", () => {
|
||||||
beforeEach(() => {
|
beforeEach(() => {
|
||||||
@@ -85,4 +92,106 @@ describe("command queue", () => {
|
|||||||
expect(waited as number).toBeGreaterThanOrEqual(5);
|
expect(waited as number).toBeGreaterThanOrEqual(5);
|
||||||
expect(queuedAhead).toBe(0);
|
expect(queuedAhead).toBe(0);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("getActiveTaskCount returns count of currently executing tasks", async () => {
|
||||||
|
let resolve1!: () => void;
|
||||||
|
const blocker = new Promise<void>((r) => {
|
||||||
|
resolve1 = r;
|
||||||
|
});
|
||||||
|
|
||||||
|
const task = enqueueCommand(async () => {
|
||||||
|
await blocker;
|
||||||
|
});
|
||||||
|
|
||||||
|
// Give the event loop a tick for the task to start.
|
||||||
|
await new Promise((r) => setTimeout(r, 5));
|
||||||
|
expect(getActiveTaskCount()).toBe(1);
|
||||||
|
|
||||||
|
resolve1();
|
||||||
|
await task;
|
||||||
|
expect(getActiveTaskCount()).toBe(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("waitForActiveTasks resolves immediately when no tasks are active", async () => {
|
||||||
|
const { drained } = await waitForActiveTasks(1000);
|
||||||
|
expect(drained).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("waitForActiveTasks waits for active tasks to finish", async () => {
|
||||||
|
let resolve1!: () => void;
|
||||||
|
const blocker = new Promise<void>((r) => {
|
||||||
|
resolve1 = r;
|
||||||
|
});
|
||||||
|
|
||||||
|
const task = enqueueCommand(async () => {
|
||||||
|
await blocker;
|
||||||
|
});
|
||||||
|
|
||||||
|
// Give the task a tick to start.
|
||||||
|
await new Promise((r) => setTimeout(r, 5));
|
||||||
|
|
||||||
|
const drainPromise = waitForActiveTasks(5000);
|
||||||
|
|
||||||
|
// Resolve the blocker after a short delay.
|
||||||
|
setTimeout(() => resolve1(), 50);
|
||||||
|
|
||||||
|
const { drained } = await drainPromise;
|
||||||
|
expect(drained).toBe(true);
|
||||||
|
|
||||||
|
await task;
|
||||||
|
});
|
||||||
|
|
||||||
|
it("waitForActiveTasks returns drained=false on timeout", async () => {
|
||||||
|
let resolve1!: () => void;
|
||||||
|
const blocker = new Promise<void>((r) => {
|
||||||
|
resolve1 = r;
|
||||||
|
});
|
||||||
|
|
||||||
|
const task = enqueueCommand(async () => {
|
||||||
|
await blocker;
|
||||||
|
});
|
||||||
|
|
||||||
|
await new Promise((r) => setTimeout(r, 5));
|
||||||
|
|
||||||
|
const { drained } = await waitForActiveTasks(50);
|
||||||
|
expect(drained).toBe(false);
|
||||||
|
|
||||||
|
resolve1();
|
||||||
|
await task;
|
||||||
|
});
|
||||||
|
|
||||||
|
it("waitForActiveTasks ignores tasks that start after the call", async () => {
|
||||||
|
const lane = `drain-snapshot-${Date.now()}-${Math.random().toString(16).slice(2)}`;
|
||||||
|
setCommandLaneConcurrency(lane, 2);
|
||||||
|
|
||||||
|
let resolve1!: () => void;
|
||||||
|
const blocker1 = new Promise<void>((r) => {
|
||||||
|
resolve1 = r;
|
||||||
|
});
|
||||||
|
let resolve2!: () => void;
|
||||||
|
const blocker2 = new Promise<void>((r) => {
|
||||||
|
resolve2 = r;
|
||||||
|
});
|
||||||
|
|
||||||
|
const first = enqueueCommandInLane(lane, async () => {
|
||||||
|
await blocker1;
|
||||||
|
});
|
||||||
|
await new Promise((r) => setTimeout(r, 5));
|
||||||
|
|
||||||
|
const drainPromise = waitForActiveTasks(2000);
|
||||||
|
|
||||||
|
// Starts after waitForActiveTasks snapshot and should not block drain completion.
|
||||||
|
const second = enqueueCommandInLane(lane, async () => {
|
||||||
|
await blocker2;
|
||||||
|
});
|
||||||
|
await new Promise((r) => setTimeout(r, 5));
|
||||||
|
expect(getActiveTaskCount()).toBeGreaterThanOrEqual(2);
|
||||||
|
|
||||||
|
resolve1();
|
||||||
|
const { drained } = await drainPromise;
|
||||||
|
expect(drained).toBe(true);
|
||||||
|
|
||||||
|
resolve2();
|
||||||
|
await Promise.all([first, second]);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -19,11 +19,13 @@ type LaneState = {
|
|||||||
lane: string;
|
lane: string;
|
||||||
queue: QueueEntry[];
|
queue: QueueEntry[];
|
||||||
active: number;
|
active: number;
|
||||||
|
activeTaskIds: Set<number>;
|
||||||
maxConcurrent: number;
|
maxConcurrent: number;
|
||||||
draining: boolean;
|
draining: boolean;
|
||||||
};
|
};
|
||||||
|
|
||||||
const lanes = new Map<string, LaneState>();
|
const lanes = new Map<string, LaneState>();
|
||||||
|
let nextTaskId = 1;
|
||||||
|
|
||||||
function getLaneState(lane: string): LaneState {
|
function getLaneState(lane: string): LaneState {
|
||||||
const existing = lanes.get(lane);
|
const existing = lanes.get(lane);
|
||||||
@@ -34,6 +36,7 @@ function getLaneState(lane: string): LaneState {
|
|||||||
lane,
|
lane,
|
||||||
queue: [],
|
queue: [],
|
||||||
active: 0,
|
active: 0,
|
||||||
|
activeTaskIds: new Set(),
|
||||||
maxConcurrent: 1,
|
maxConcurrent: 1,
|
||||||
draining: false,
|
draining: false,
|
||||||
};
|
};
|
||||||
@@ -59,12 +62,15 @@ function drainLane(lane: string) {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
logLaneDequeue(lane, waitedMs, state.queue.length);
|
logLaneDequeue(lane, waitedMs, state.queue.length);
|
||||||
|
const taskId = nextTaskId++;
|
||||||
state.active += 1;
|
state.active += 1;
|
||||||
|
state.activeTaskIds.add(taskId);
|
||||||
void (async () => {
|
void (async () => {
|
||||||
const startTime = Date.now();
|
const startTime = Date.now();
|
||||||
try {
|
try {
|
||||||
const result = await entry.task();
|
const result = await entry.task();
|
||||||
state.active -= 1;
|
state.active -= 1;
|
||||||
|
state.activeTaskIds.delete(taskId);
|
||||||
diag.debug(
|
diag.debug(
|
||||||
`lane task done: lane=${lane} durationMs=${Date.now() - startTime} active=${state.active} queued=${state.queue.length}`,
|
`lane task done: lane=${lane} durationMs=${Date.now() - startTime} active=${state.active} queued=${state.queue.length}`,
|
||||||
);
|
);
|
||||||
@@ -72,6 +78,7 @@ function drainLane(lane: string) {
|
|||||||
entry.resolve(result);
|
entry.resolve(result);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
state.active -= 1;
|
state.active -= 1;
|
||||||
|
state.activeTaskIds.delete(taskId);
|
||||||
const isProbeLane = lane.startsWith("auth-probe:") || lane.startsWith("session:probe-");
|
const isProbeLane = lane.startsWith("auth-probe:") || lane.startsWith("session:probe-");
|
||||||
if (!isProbeLane) {
|
if (!isProbeLane) {
|
||||||
diag.error(
|
diag.error(
|
||||||
@@ -158,3 +165,67 @@ export function clearCommandLane(lane: string = CommandLane.Main) {
|
|||||||
state.queue.length = 0;
|
state.queue.length = 0;
|
||||||
return removed;
|
return removed;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the total number of actively executing tasks across all lanes
|
||||||
|
* (excludes queued-but-not-started entries).
|
||||||
|
*/
|
||||||
|
export function getActiveTaskCount(): number {
|
||||||
|
let total = 0;
|
||||||
|
for (const s of lanes.values()) {
|
||||||
|
total += s.active;
|
||||||
|
}
|
||||||
|
return total;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait for all currently active tasks across all lanes to finish.
|
||||||
|
* Polls at a short interval; resolves when no tasks are active or
|
||||||
|
* when `timeoutMs` elapses (whichever comes first).
|
||||||
|
*
|
||||||
|
* New tasks enqueued after this call are ignored — only tasks that are
|
||||||
|
* already executing are waited on.
|
||||||
|
*/
|
||||||
|
export function waitForActiveTasks(timeoutMs: number): Promise<{ drained: boolean }> {
|
||||||
|
const POLL_INTERVAL_MS = 250;
|
||||||
|
const deadline = Date.now() + timeoutMs;
|
||||||
|
const activeAtStart = new Set<number>();
|
||||||
|
for (const state of lanes.values()) {
|
||||||
|
for (const taskId of state.activeTaskIds) {
|
||||||
|
activeAtStart.add(taskId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return new Promise((resolve) => {
|
||||||
|
const check = () => {
|
||||||
|
if (activeAtStart.size === 0) {
|
||||||
|
resolve({ drained: true });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let hasPending = false;
|
||||||
|
for (const state of lanes.values()) {
|
||||||
|
for (const taskId of state.activeTaskIds) {
|
||||||
|
if (activeAtStart.has(taskId)) {
|
||||||
|
hasPending = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (hasPending) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!hasPending) {
|
||||||
|
resolve({ drained: true });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (Date.now() >= deadline) {
|
||||||
|
resolve({ drained: false });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
setTimeout(check, POLL_INTERVAL_MS);
|
||||||
|
};
|
||||||
|
check();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user