mirror of
https://github.com/farcasclaudiu/openclaw.git
synced 2026-06-28 23:02:02 +03:00
perf(logging): split diagnostic session state module
This commit is contained in:
@@ -0,0 +1,91 @@
|
|||||||
|
export type SessionStateValue = "idle" | "processing" | "waiting";
|
||||||
|
|
||||||
|
export type SessionState = {
|
||||||
|
sessionId?: string;
|
||||||
|
sessionKey?: string;
|
||||||
|
lastActivity: number;
|
||||||
|
state: SessionStateValue;
|
||||||
|
queueDepth: number;
|
||||||
|
};
|
||||||
|
|
||||||
|
export type SessionRef = {
|
||||||
|
sessionId?: string;
|
||||||
|
sessionKey?: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
export const diagnosticSessionStates = new Map<string, SessionState>();
|
||||||
|
|
||||||
|
const SESSION_STATE_TTL_MS = 30 * 60 * 1000;
|
||||||
|
const SESSION_STATE_PRUNE_INTERVAL_MS = 60 * 1000;
|
||||||
|
const SESSION_STATE_MAX_ENTRIES = 2000;
|
||||||
|
|
||||||
|
let lastSessionPruneAt = 0;
|
||||||
|
|
||||||
|
export function pruneDiagnosticSessionStates(now = Date.now(), force = false): void {
|
||||||
|
const shouldPruneForSize = diagnosticSessionStates.size > SESSION_STATE_MAX_ENTRIES;
|
||||||
|
if (!force && !shouldPruneForSize && now - lastSessionPruneAt < SESSION_STATE_PRUNE_INTERVAL_MS) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
lastSessionPruneAt = now;
|
||||||
|
|
||||||
|
for (const [key, state] of diagnosticSessionStates.entries()) {
|
||||||
|
const ageMs = now - state.lastActivity;
|
||||||
|
const isIdle = state.state === "idle";
|
||||||
|
if (isIdle && state.queueDepth <= 0 && ageMs > SESSION_STATE_TTL_MS) {
|
||||||
|
diagnosticSessionStates.delete(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (diagnosticSessionStates.size <= SESSION_STATE_MAX_ENTRIES) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const excess = diagnosticSessionStates.size - SESSION_STATE_MAX_ENTRIES;
|
||||||
|
const ordered = Array.from(diagnosticSessionStates.entries()).toSorted(
|
||||||
|
(a, b) => a[1].lastActivity - b[1].lastActivity,
|
||||||
|
);
|
||||||
|
for (let i = 0; i < excess; i += 1) {
|
||||||
|
const key = ordered[i]?.[0];
|
||||||
|
if (!key) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
diagnosticSessionStates.delete(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function resolveSessionKey({ sessionKey, sessionId }: SessionRef) {
|
||||||
|
return sessionKey ?? sessionId ?? "unknown";
|
||||||
|
}
|
||||||
|
|
||||||
|
export function getDiagnosticSessionState(ref: SessionRef): SessionState {
|
||||||
|
pruneDiagnosticSessionStates();
|
||||||
|
const key = resolveSessionKey(ref);
|
||||||
|
const existing = diagnosticSessionStates.get(key);
|
||||||
|
if (existing) {
|
||||||
|
if (ref.sessionId) {
|
||||||
|
existing.sessionId = ref.sessionId;
|
||||||
|
}
|
||||||
|
if (ref.sessionKey) {
|
||||||
|
existing.sessionKey = ref.sessionKey;
|
||||||
|
}
|
||||||
|
return existing;
|
||||||
|
}
|
||||||
|
const created: SessionState = {
|
||||||
|
sessionId: ref.sessionId,
|
||||||
|
sessionKey: ref.sessionKey,
|
||||||
|
lastActivity: Date.now(),
|
||||||
|
state: "idle",
|
||||||
|
queueDepth: 0,
|
||||||
|
};
|
||||||
|
diagnosticSessionStates.set(key, created);
|
||||||
|
pruneDiagnosticSessionStates(Date.now(), true);
|
||||||
|
return created;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function getDiagnosticSessionStateCountForTest(): number {
|
||||||
|
return diagnosticSessionStates.size;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function resetDiagnosticSessionStateForTest(): void {
|
||||||
|
diagnosticSessionStates.clear();
|
||||||
|
lastSessionPruneAt = 0;
|
||||||
|
}
|
||||||
@@ -1,34 +1,34 @@
|
|||||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||||
import {
|
import {
|
||||||
getDiagnosticSessionStateCountForTest,
|
getDiagnosticSessionStateCountForTest,
|
||||||
logSessionStateChange,
|
getDiagnosticSessionState,
|
||||||
resetDiagnosticStateForTest,
|
resetDiagnosticSessionStateForTest,
|
||||||
} from "./diagnostic.js";
|
} from "./diagnostic-session-state.js";
|
||||||
|
|
||||||
describe("diagnostic session state pruning", () => {
|
describe("diagnostic session state pruning", () => {
|
||||||
beforeEach(() => {
|
beforeEach(() => {
|
||||||
vi.useFakeTimers();
|
vi.useFakeTimers();
|
||||||
resetDiagnosticStateForTest();
|
resetDiagnosticSessionStateForTest();
|
||||||
});
|
});
|
||||||
|
|
||||||
afterEach(() => {
|
afterEach(() => {
|
||||||
resetDiagnosticStateForTest();
|
resetDiagnosticSessionStateForTest();
|
||||||
vi.useRealTimers();
|
vi.useRealTimers();
|
||||||
});
|
});
|
||||||
|
|
||||||
it("evicts stale idle session states", () => {
|
it("evicts stale idle session states", () => {
|
||||||
logSessionStateChange({ sessionId: "stale-1", state: "idle" });
|
getDiagnosticSessionState({ sessionId: "stale-1" });
|
||||||
expect(getDiagnosticSessionStateCountForTest()).toBe(1);
|
expect(getDiagnosticSessionStateCountForTest()).toBe(1);
|
||||||
|
|
||||||
vi.advanceTimersByTime(31 * 60 * 1000);
|
vi.advanceTimersByTime(31 * 60 * 1000);
|
||||||
logSessionStateChange({ sessionId: "fresh-1", state: "idle" });
|
getDiagnosticSessionState({ sessionId: "fresh-1" });
|
||||||
|
|
||||||
expect(getDiagnosticSessionStateCountForTest()).toBe(1);
|
expect(getDiagnosticSessionStateCountForTest()).toBe(1);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("caps tracked session states to a bounded max", () => {
|
it("caps tracked session states to a bounded max", () => {
|
||||||
for (let i = 0; i < 2001; i += 1) {
|
for (let i = 0; i < 2001; i += 1) {
|
||||||
logSessionStateChange({ sessionId: `session-${i}`, state: "idle" });
|
getDiagnosticSessionState({ sessionId: `session-${i}` });
|
||||||
}
|
}
|
||||||
|
|
||||||
expect(getDiagnosticSessionStateCountForTest()).toBe(2000);
|
expect(getDiagnosticSessionStateCountForTest()).toBe(2000);
|
||||||
|
|||||||
+20
-93
@@ -1,28 +1,17 @@
|
|||||||
import { emitDiagnosticEvent } from "../infra/diagnostic-events.js";
|
import { emitDiagnosticEvent } from "../infra/diagnostic-events.js";
|
||||||
|
import {
|
||||||
|
diagnosticSessionStates,
|
||||||
|
getDiagnosticSessionState,
|
||||||
|
getDiagnosticSessionStateCountForTest as getDiagnosticSessionStateCountForTestImpl,
|
||||||
|
pruneDiagnosticSessionStates,
|
||||||
|
resetDiagnosticSessionStateForTest,
|
||||||
|
type SessionRef,
|
||||||
|
type SessionStateValue,
|
||||||
|
} from "./diagnostic-session-state.js";
|
||||||
import { createSubsystemLogger } from "./subsystem.js";
|
import { createSubsystemLogger } from "./subsystem.js";
|
||||||
|
|
||||||
const diag = createSubsystemLogger("diagnostic");
|
const diag = createSubsystemLogger("diagnostic");
|
||||||
|
|
||||||
type SessionStateValue = "idle" | "processing" | "waiting";
|
|
||||||
|
|
||||||
type SessionState = {
|
|
||||||
sessionId?: string;
|
|
||||||
sessionKey?: string;
|
|
||||||
lastActivity: number;
|
|
||||||
state: SessionStateValue;
|
|
||||||
queueDepth: number;
|
|
||||||
};
|
|
||||||
|
|
||||||
type SessionRef = {
|
|
||||||
sessionId?: string;
|
|
||||||
sessionKey?: string;
|
|
||||||
};
|
|
||||||
|
|
||||||
const sessionStates = new Map<string, SessionState>();
|
|
||||||
const SESSION_STATE_TTL_MS = 30 * 60 * 1000;
|
|
||||||
const SESSION_STATE_PRUNE_INTERVAL_MS = 60 * 1000;
|
|
||||||
const SESSION_STATE_MAX_ENTRIES = 2000;
|
|
||||||
|
|
||||||
const webhookStats = {
|
const webhookStats = {
|
||||||
received: 0,
|
received: 0,
|
||||||
processed: 0,
|
processed: 0,
|
||||||
@@ -31,72 +20,11 @@ const webhookStats = {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let lastActivityAt = 0;
|
let lastActivityAt = 0;
|
||||||
let lastSessionPruneAt = 0;
|
|
||||||
|
|
||||||
function markActivity() {
|
function markActivity() {
|
||||||
lastActivityAt = Date.now();
|
lastActivityAt = Date.now();
|
||||||
}
|
}
|
||||||
|
|
||||||
function pruneSessionStates(now = Date.now(), force = false): void {
|
|
||||||
const shouldPruneForSize = sessionStates.size > SESSION_STATE_MAX_ENTRIES;
|
|
||||||
if (!force && !shouldPruneForSize && now - lastSessionPruneAt < SESSION_STATE_PRUNE_INTERVAL_MS) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
lastSessionPruneAt = now;
|
|
||||||
|
|
||||||
for (const [key, state] of sessionStates.entries()) {
|
|
||||||
const ageMs = now - state.lastActivity;
|
|
||||||
const isIdle = state.state === "idle";
|
|
||||||
if (isIdle && state.queueDepth <= 0 && ageMs > SESSION_STATE_TTL_MS) {
|
|
||||||
sessionStates.delete(key);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (sessionStates.size <= SESSION_STATE_MAX_ENTRIES) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
const excess = sessionStates.size - SESSION_STATE_MAX_ENTRIES;
|
|
||||||
const ordered = Array.from(sessionStates.entries()).toSorted(
|
|
||||||
(a, b) => a[1].lastActivity - b[1].lastActivity,
|
|
||||||
);
|
|
||||||
for (let i = 0; i < excess; i += 1) {
|
|
||||||
const key = ordered[i]?.[0];
|
|
||||||
if (!key) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
sessionStates.delete(key);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function resolveSessionKey({ sessionKey, sessionId }: SessionRef) {
|
|
||||||
return sessionKey ?? sessionId ?? "unknown";
|
|
||||||
}
|
|
||||||
|
|
||||||
function getSessionState(ref: SessionRef): SessionState {
|
|
||||||
pruneSessionStates();
|
|
||||||
const key = resolveSessionKey(ref);
|
|
||||||
const existing = sessionStates.get(key);
|
|
||||||
if (existing) {
|
|
||||||
if (ref.sessionId) {
|
|
||||||
existing.sessionId = ref.sessionId;
|
|
||||||
}
|
|
||||||
if (ref.sessionKey) {
|
|
||||||
existing.sessionKey = ref.sessionKey;
|
|
||||||
}
|
|
||||||
return existing;
|
|
||||||
}
|
|
||||||
const created: SessionState = {
|
|
||||||
sessionId: ref.sessionId,
|
|
||||||
sessionKey: ref.sessionKey,
|
|
||||||
lastActivity: Date.now(),
|
|
||||||
state: "idle",
|
|
||||||
queueDepth: 0,
|
|
||||||
};
|
|
||||||
sessionStates.set(key, created);
|
|
||||||
pruneSessionStates(Date.now(), true);
|
|
||||||
return created;
|
|
||||||
}
|
|
||||||
|
|
||||||
export function logWebhookReceived(params: {
|
export function logWebhookReceived(params: {
|
||||||
channel: string;
|
channel: string;
|
||||||
updateType?: string;
|
updateType?: string;
|
||||||
@@ -174,7 +102,7 @@ export function logMessageQueued(params: {
|
|||||||
channel?: string;
|
channel?: string;
|
||||||
source: string;
|
source: string;
|
||||||
}) {
|
}) {
|
||||||
const state = getSessionState(params);
|
const state = getDiagnosticSessionState(params);
|
||||||
state.queueDepth += 1;
|
state.queueDepth += 1;
|
||||||
state.lastActivity = Date.now();
|
state.lastActivity = Date.now();
|
||||||
if (diag.isEnabled("debug")) {
|
if (diag.isEnabled("debug")) {
|
||||||
@@ -244,7 +172,7 @@ export function logSessionStateChange(
|
|||||||
reason?: string;
|
reason?: string;
|
||||||
},
|
},
|
||||||
) {
|
) {
|
||||||
const state = getSessionState(params);
|
const state = getDiagnosticSessionState(params);
|
||||||
const isProbeSession = state.sessionId?.startsWith("probe-") ?? false;
|
const isProbeSession = state.sessionId?.startsWith("probe-") ?? false;
|
||||||
const prevState = state.state;
|
const prevState = state.state;
|
||||||
state.state = params.state;
|
state.state = params.state;
|
||||||
@@ -274,7 +202,7 @@ export function logSessionStateChange(
|
|||||||
}
|
}
|
||||||
|
|
||||||
export function logSessionStuck(params: SessionRef & { state: SessionStateValue; ageMs: number }) {
|
export function logSessionStuck(params: SessionRef & { state: SessionStateValue; ageMs: number }) {
|
||||||
const state = getSessionState(params);
|
const state = getDiagnosticSessionState(params);
|
||||||
diag.warn(
|
diag.warn(
|
||||||
`stuck session: sessionId=${state.sessionId ?? "unknown"} sessionKey=${
|
`stuck session: sessionId=${state.sessionId ?? "unknown"} sessionKey=${
|
||||||
state.sessionKey ?? "unknown"
|
state.sessionKey ?? "unknown"
|
||||||
@@ -329,7 +257,7 @@ export function logRunAttempt(params: SessionRef & { runId: string; attempt: num
|
|||||||
}
|
}
|
||||||
|
|
||||||
export function logActiveRuns() {
|
export function logActiveRuns() {
|
||||||
const activeSessions = Array.from(sessionStates.entries())
|
const activeSessions = Array.from(diagnosticSessionStates.entries())
|
||||||
.filter(([, s]) => s.state === "processing")
|
.filter(([, s]) => s.state === "processing")
|
||||||
.map(
|
.map(
|
||||||
([id, s]) =>
|
([id, s]) =>
|
||||||
@@ -347,14 +275,14 @@ export function startDiagnosticHeartbeat() {
|
|||||||
}
|
}
|
||||||
heartbeatInterval = setInterval(() => {
|
heartbeatInterval = setInterval(() => {
|
||||||
const now = Date.now();
|
const now = Date.now();
|
||||||
pruneSessionStates(now, true);
|
pruneDiagnosticSessionStates(now, true);
|
||||||
const activeCount = Array.from(sessionStates.values()).filter(
|
const activeCount = Array.from(diagnosticSessionStates.values()).filter(
|
||||||
(s) => s.state === "processing",
|
(s) => s.state === "processing",
|
||||||
).length;
|
).length;
|
||||||
const waitingCount = Array.from(sessionStates.values()).filter(
|
const waitingCount = Array.from(diagnosticSessionStates.values()).filter(
|
||||||
(s) => s.state === "waiting",
|
(s) => s.state === "waiting",
|
||||||
).length;
|
).length;
|
||||||
const totalQueued = Array.from(sessionStates.values()).reduce(
|
const totalQueued = Array.from(diagnosticSessionStates.values()).reduce(
|
||||||
(sum, s) => sum + s.queueDepth,
|
(sum, s) => sum + s.queueDepth,
|
||||||
0,
|
0,
|
||||||
);
|
);
|
||||||
@@ -386,7 +314,7 @@ export function startDiagnosticHeartbeat() {
|
|||||||
queued: totalQueued,
|
queued: totalQueued,
|
||||||
});
|
});
|
||||||
|
|
||||||
for (const [, state] of sessionStates) {
|
for (const [, state] of diagnosticSessionStates) {
|
||||||
const ageMs = now - state.lastActivity;
|
const ageMs = now - state.lastActivity;
|
||||||
if (state.state === "processing" && ageMs > 120_000) {
|
if (state.state === "processing" && ageMs > 120_000) {
|
||||||
logSessionStuck({
|
logSessionStuck({
|
||||||
@@ -409,17 +337,16 @@ export function stopDiagnosticHeartbeat() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export function getDiagnosticSessionStateCountForTest(): number {
|
export function getDiagnosticSessionStateCountForTest(): number {
|
||||||
return sessionStates.size;
|
return getDiagnosticSessionStateCountForTestImpl();
|
||||||
}
|
}
|
||||||
|
|
||||||
export function resetDiagnosticStateForTest(): void {
|
export function resetDiagnosticStateForTest(): void {
|
||||||
sessionStates.clear();
|
resetDiagnosticSessionStateForTest();
|
||||||
webhookStats.received = 0;
|
webhookStats.received = 0;
|
||||||
webhookStats.processed = 0;
|
webhookStats.processed = 0;
|
||||||
webhookStats.errors = 0;
|
webhookStats.errors = 0;
|
||||||
webhookStats.lastReceived = 0;
|
webhookStats.lastReceived = 0;
|
||||||
lastActivityAt = 0;
|
lastActivityAt = 0;
|
||||||
lastSessionPruneAt = 0;
|
|
||||||
stopDiagnosticHeartbeat();
|
stopDiagnosticHeartbeat();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user