mirror of
https://github.com/farcasclaudiu/openclaw.git
synced 2026-06-28 23:02:02 +03:00
fix (heartbeat/cron): preserve cron prompts for tagged interval events
This commit is contained in:
@@ -35,7 +35,7 @@ export type CronServiceDeps = {
|
|||||||
resolveSessionStorePath?: (agentId?: string) => string;
|
resolveSessionStorePath?: (agentId?: string) => string;
|
||||||
/** Path to the session store (sessions.json) for reaper use. */
|
/** Path to the session store (sessions.json) for reaper use. */
|
||||||
sessionStorePath?: string;
|
sessionStorePath?: string;
|
||||||
enqueueSystemEvent: (text: string, opts?: { agentId?: string }) => void;
|
enqueueSystemEvent: (text: string, opts?: { agentId?: string; contextKey?: string }) => void;
|
||||||
requestHeartbeatNow: (opts?: { reason?: string }) => void;
|
requestHeartbeatNow: (opts?: { reason?: string }) => void;
|
||||||
runHeartbeatOnce?: (opts?: { reason?: string; agentId?: string }) => Promise<HeartbeatRunResult>;
|
runHeartbeatOnce?: (opts?: { reason?: string; agentId?: string }) => Promise<HeartbeatRunResult>;
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -445,7 +445,10 @@ async function executeJobCore(
|
|||||||
: 'main job requires payload.kind="systemEvent"',
|
: 'main job requires payload.kind="systemEvent"',
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
state.deps.enqueueSystemEvent(text, { agentId: job.agentId });
|
state.deps.enqueueSystemEvent(text, {
|
||||||
|
agentId: job.agentId,
|
||||||
|
contextKey: `cron:${job.id}`,
|
||||||
|
});
|
||||||
if (job.wakeMode === "now" && state.deps.runHeartbeatOnce) {
|
if (job.wakeMode === "now" && state.deps.runHeartbeatOnce) {
|
||||||
const reason = `cron:${job.id}`;
|
const reason = `cron:${job.id}`;
|
||||||
const delay = (ms: number) => new Promise<void>((resolve) => setTimeout(resolve, ms));
|
const delay = (ms: number) => new Promise<void>((resolve) => setTimeout(resolve, ms));
|
||||||
@@ -503,7 +506,10 @@ async function executeJobCore(
|
|||||||
const prefix = "Cron";
|
const prefix = "Cron";
|
||||||
const label =
|
const label =
|
||||||
res.status === "error" ? `${prefix} (error): ${summaryText}` : `${prefix}: ${summaryText}`;
|
res.status === "error" ? `${prefix} (error): ${summaryText}` : `${prefix}: ${summaryText}`;
|
||||||
state.deps.enqueueSystemEvent(label, { agentId: job.agentId });
|
state.deps.enqueueSystemEvent(label, {
|
||||||
|
agentId: job.agentId,
|
||||||
|
contextKey: `cron:${job.id}`,
|
||||||
|
});
|
||||||
if (job.wakeMode === "now") {
|
if (job.wakeMode === "now") {
|
||||||
state.deps.requestHeartbeatNow({ reason: `cron:${job.id}` });
|
state.deps.requestHeartbeatNow({ reason: `cron:${job.id}` });
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -64,7 +64,7 @@ export function buildGatewayCronService(params: {
|
|||||||
cfg: runtimeConfig,
|
cfg: runtimeConfig,
|
||||||
agentId,
|
agentId,
|
||||||
});
|
});
|
||||||
enqueueSystemEvent(text, { sessionKey });
|
enqueueSystemEvent(text, { sessionKey, contextKey: opts?.contextKey });
|
||||||
},
|
},
|
||||||
requestHeartbeatNow,
|
requestHeartbeatNow,
|
||||||
runHeartbeatOnce: async (opts) => {
|
runHeartbeatOnce: async (opts) => {
|
||||||
|
|||||||
@@ -59,7 +59,7 @@ import {
|
|||||||
resolveHeartbeatDeliveryTarget,
|
resolveHeartbeatDeliveryTarget,
|
||||||
resolveHeartbeatSenderContext,
|
resolveHeartbeatSenderContext,
|
||||||
} from "./outbound/targets.js";
|
} from "./outbound/targets.js";
|
||||||
import { peekSystemEvents } from "./system-events.js";
|
import { peekSystemEventEntries } from "./system-events.js";
|
||||||
|
|
||||||
type HeartbeatDeps = OutboundSendDeps &
|
type HeartbeatDeps = OutboundSendDeps &
|
||||||
ChannelHeartbeatDeps & {
|
ChannelHeartbeatDeps & {
|
||||||
@@ -487,11 +487,23 @@ export async function runHeartbeatOnce(opts: {
|
|||||||
// If so, use a specialized prompt that instructs the model to relay the result
|
// If so, use a specialized prompt that instructs the model to relay the result
|
||||||
// instead of the standard heartbeat prompt with "reply HEARTBEAT_OK".
|
// instead of the standard heartbeat prompt with "reply HEARTBEAT_OK".
|
||||||
const isExecEvent = opts.reason === "exec-event";
|
const isExecEvent = opts.reason === "exec-event";
|
||||||
const isCronEvent = Boolean(opts.reason?.startsWith("cron:"));
|
const pendingEventEntries = peekSystemEventEntries(sessionKey);
|
||||||
const pendingEvents = isExecEvent || isCronEvent ? peekSystemEvents(sessionKey) : [];
|
const hasTaggedCronEvents = pendingEventEntries.some((event) =>
|
||||||
const cronEvents = pendingEvents.filter((evt) => isCronSystemEvent(evt));
|
event.contextKey?.startsWith("cron:"),
|
||||||
|
);
|
||||||
|
const shouldInspectPendingEvents = isExecEvent || isCronEventReason || hasTaggedCronEvents;
|
||||||
|
const pendingEvents = shouldInspectPendingEvents
|
||||||
|
? pendingEventEntries.map((event) => event.text)
|
||||||
|
: [];
|
||||||
|
const cronEvents = pendingEventEntries
|
||||||
|
.filter(
|
||||||
|
(event) =>
|
||||||
|
(isCronEventReason || event.contextKey?.startsWith("cron:")) &&
|
||||||
|
isCronSystemEvent(event.text),
|
||||||
|
)
|
||||||
|
.map((event) => event.text);
|
||||||
const hasExecCompletion = pendingEvents.some(isExecCompletionEvent);
|
const hasExecCompletion = pendingEvents.some(isExecCompletionEvent);
|
||||||
const hasCronEvents = isCronEvent && cronEvents.length > 0;
|
const hasCronEvents = cronEvents.length > 0;
|
||||||
const prompt = hasExecCompletion
|
const prompt = hasExecCompletion
|
||||||
? EXEC_EVENT_PROMPT
|
? EXEC_EVENT_PROMPT
|
||||||
: hasCronEvents
|
: hasCronEvents
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
// prefixed to the next prompt. We intentionally avoid persistence to keep
|
// prefixed to the next prompt. We intentionally avoid persistence to keep
|
||||||
// events ephemeral. Events are session-scoped and require an explicit key.
|
// events ephemeral. Events are session-scoped and require an explicit key.
|
||||||
|
|
||||||
export type SystemEvent = { text: string; ts: number };
|
export type SystemEvent = { text: string; ts: number; contextKey?: string | null };
|
||||||
|
|
||||||
const MAX_EVENTS = 20;
|
const MAX_EVENTS = 20;
|
||||||
|
|
||||||
@@ -65,12 +65,17 @@ export function enqueueSystemEvent(text: string, options: SystemEventOptions) {
|
|||||||
if (!cleaned) {
|
if (!cleaned) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
entry.lastContextKey = normalizeContextKey(options?.contextKey);
|
const normalizedContextKey = normalizeContextKey(options?.contextKey);
|
||||||
|
entry.lastContextKey = normalizedContextKey;
|
||||||
if (entry.lastText === cleaned) {
|
if (entry.lastText === cleaned) {
|
||||||
return;
|
return;
|
||||||
} // skip consecutive duplicates
|
} // skip consecutive duplicates
|
||||||
entry.lastText = cleaned;
|
entry.lastText = cleaned;
|
||||||
entry.queue.push({ text: cleaned, ts: Date.now() });
|
entry.queue.push({
|
||||||
|
text: cleaned,
|
||||||
|
ts: Date.now(),
|
||||||
|
contextKey: normalizedContextKey,
|
||||||
|
});
|
||||||
if (entry.queue.length > MAX_EVENTS) {
|
if (entry.queue.length > MAX_EVENTS) {
|
||||||
entry.queue.shift();
|
entry.queue.shift();
|
||||||
}
|
}
|
||||||
@@ -94,9 +99,13 @@ export function drainSystemEvents(sessionKey: string): string[] {
|
|||||||
return drainSystemEventEntries(sessionKey).map((event) => event.text);
|
return drainSystemEventEntries(sessionKey).map((event) => event.text);
|
||||||
}
|
}
|
||||||
|
|
||||||
export function peekSystemEvents(sessionKey: string): string[] {
|
export function peekSystemEventEntries(sessionKey: string): SystemEvent[] {
|
||||||
const key = requireSessionKey(sessionKey);
|
const key = requireSessionKey(sessionKey);
|
||||||
return queues.get(key)?.queue.map((e) => e.text) ?? [];
|
return queues.get(key)?.queue.map((event) => ({ ...event })) ?? [];
|
||||||
|
}
|
||||||
|
|
||||||
|
export function peekSystemEvents(sessionKey: string): string[] {
|
||||||
|
return peekSystemEventEntries(sessionKey).map((event) => event.text);
|
||||||
}
|
}
|
||||||
|
|
||||||
export function hasSystemEvents(sessionKey: string) {
|
export function hasSystemEvents(sessionKey: string) {
|
||||||
|
|||||||
Reference in New Issue
Block a user