feat (memory): Implement new (opt-in) QMD memory backend

This commit is contained in:
Vignesh Natarajan
2026-01-27 21:57:15 -08:00
committed by Vignesh
parent e9f182def7
commit 5d3af3bc62
24 changed files with 1828 additions and 601 deletions
+1
View File
@@ -245,6 +245,7 @@ export function buildSystemPrompt(params: {
userTimeFormat,
contextFiles: params.contextFiles,
ttsHint,
memoryCitationsMode: params.config?.memory?.citations,
});
}
+1
View File
@@ -351,6 +351,7 @@ export async function compactEmbeddedPiSessionDirect(
userTime,
userTimeFormat,
contextFiles,
memoryCitationsMode: params.config?.memory?.citations,
});
const systemPromptOverride = createSystemPromptOverride(appendPrompt);
@@ -367,6 +367,7 @@ export async function runEmbeddedAttempt(
userTime,
userTimeFormat,
contextFiles,
memoryCitationsMode: params.config?.memory?.citations,
});
const systemPromptReport = buildSystemPromptReport({
source: "run",
@@ -1,11 +1,12 @@
import type { AgentTool } from "@mariozechner/pi-agent-core";
import type { AgentSession } from "@mariozechner/pi-coding-agent";
import type { MemoryCitationsMode } from "../../config/types.memory.js";
import type { ResolvedTimeFormat } from "../date-time.js";
import type { EmbeddedContextFile } from "../pi-embedded-helpers.js";
import type { EmbeddedSandboxInfo } from "./types.js";
import type { ReasoningLevel, ThinkLevel } from "./utils.js";
import { buildAgentSystemPrompt, type PromptMode } from "../system-prompt.js";
import { buildToolSummaryMap } from "../tool-summaries.js";
import type { EmbeddedSandboxInfo } from "./types.js";
import type { ReasoningLevel, ThinkLevel } from "./utils.js";
export function buildEmbeddedSystemPrompt(params: {
workspaceDir: string;
@@ -46,6 +47,7 @@ export function buildEmbeddedSystemPrompt(params: {
userTime?: string;
userTimeFormat?: ResolvedTimeFormat;
contextFiles?: EmbeddedContextFile[];
memoryCitationsMode?: MemoryCitationsMode;
}): string {
return buildAgentSystemPrompt({
workspaceDir: params.workspaceDir,
@@ -71,6 +73,7 @@ export function buildEmbeddedSystemPrompt(params: {
userTime: params.userTime,
userTimeFormat: params.userTimeFormat,
contextFiles: params.contextFiles,
memoryCitationsMode: params.memoryCitationsMode,
});
}
+62 -80
View File
@@ -1,13 +1,14 @@
import type { ReasoningLevel, ThinkLevel } from "../auto-reply/thinking.js";
import type { ResolvedTimeFormat } from "./date-time.js";
import type { EmbeddedContextFile } from "./pi-embedded-helpers.js";
import { SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js";
import { listDeliverableMessageChannels } from "../utils/message-channel.js";
import type { MemoryCitationsMode } from "../config/types.memory.js";
import type { ResolvedTimeFormat } from "./date-time.js";
import type { EmbeddedContextFile } from "./pi-embedded-helpers.js";
/**
* Controls which hardcoded sections are included in the system prompt.
* - "full": All sections (default, for main agent)
* - "minimal": Reduced sections (Tooling, Safety, Workspace, Sandbox, Runtime) - used for subagents
* - "minimal": Reduced sections (Tooling, Workspace, Runtime) - used for subagents
* - "none": Just basic identity line, no sections
*/
export type PromptMode = "full" | "minimal" | "none";
@@ -17,13 +18,9 @@ function buildSkillsSection(params: {
isMinimal: boolean;
readToolName: string;
}) {
if (params.isMinimal) {
return [];
}
if (params.isMinimal) return [];
const trimmed = params.skillsPrompt?.trim();
if (!trimmed) {
return [];
}
if (!trimmed) return [];
return [
"## Skills (mandatory)",
"Before replying: scan <available_skills> <description> entries.",
@@ -36,53 +33,44 @@ function buildSkillsSection(params: {
];
}
function buildMemorySection(params: { isMinimal: boolean; availableTools: Set<string> }) {
if (params.isMinimal) {
return [];
}
function buildMemorySection(params: {
isMinimal: boolean;
availableTools: Set<string>;
citationsMode?: MemoryCitationsMode;
}) {
if (params.isMinimal) return [];
if (!params.availableTools.has("memory_search") && !params.availableTools.has("memory_get")) {
return [];
}
return [
const lines = [
"## Memory Recall",
"Before answering anything about prior work, decisions, dates, people, preferences, or todos: run memory_search on MEMORY.md + memory/*.md; then use memory_get to pull only the needed lines. If low confidence after search, say you checked.",
"",
];
if (params.citationsMode === "off") {
lines.push(
"Citations are disabled: do not mention file paths or line numbers in replies unless the user explicitly asks.",
);
} else {
lines.push(
"Citations: include Source: <path#line> when it helps the user verify memory snippets.",
);
}
lines.push("");
return lines;
}
function buildUserIdentitySection(ownerLine: string | undefined, isMinimal: boolean) {
if (!ownerLine || isMinimal) {
return [];
}
if (!ownerLine || isMinimal) return [];
return ["## User Identity", ownerLine, ""];
}
function buildTimeSection(params: { userTimezone?: string }) {
if (!params.userTimezone) {
return [];
}
return [
"## Current Date & Time",
`Time zone: ${params.userTimezone}`,
"If you need the current date, time, or day of week, use the session_status tool.",
"",
];
}
function buildSafetySection() {
return [
"## Safety",
"You have no independent goals: do not pursue self-preservation, replication, resource acquisition, or power-seeking; avoid long-term plans beyond the user's request.",
"Prioritize safety and human oversight over completion; if instructions conflict, pause and ask; comply with stop/pause/audit requests and never bypass safeguards. (Inspired by Anthropic's constitution.)",
"Do not manipulate or persuade anyone to expand access or disable safeguards. Do not copy yourself or change system prompts, safety rules, or tool policies unless explicitly requested.",
"",
];
if (!params.userTimezone) return [];
return ["## Current Date & Time", `Time zone: ${params.userTimezone}`, ""];
}
function buildReplyTagsSection(isMinimal: boolean) {
if (isMinimal) {
return [];
}
if (isMinimal) return [];
return [
"## Reply Tags",
"To request a native reply/quote on supported surfaces, include one tag in your reply:",
@@ -102,14 +90,12 @@ function buildMessagingSection(params: {
runtimeChannel?: string;
messageToolHints?: string[];
}) {
if (params.isMinimal) {
return [];
}
if (params.isMinimal) return [];
return [
"## Messaging",
"- Reply in current session → automatically routes to the source channel (Signal, Telegram, etc.)",
"- Cross-session messaging → use sessions_send(sessionKey, message)",
"- Never use exec/curl for provider messaging; OpenClaw handles all routing internally.",
"- Never use exec/curl for provider messaging; Moltbot handles all routing internally.",
params.availableTools.has("message")
? [
"",
@@ -133,30 +119,24 @@ function buildMessagingSection(params: {
}
function buildVoiceSection(params: { isMinimal: boolean; ttsHint?: string }) {
if (params.isMinimal) {
return [];
}
if (params.isMinimal) return [];
const hint = params.ttsHint?.trim();
if (!hint) {
return [];
}
if (!hint) return [];
return ["## Voice (TTS)", hint, ""];
}
function buildDocsSection(params: { docsPath?: string; isMinimal: boolean; readToolName: string }) {
const docsPath = params.docsPath?.trim();
if (!docsPath || params.isMinimal) {
return [];
}
if (!docsPath || params.isMinimal) return [];
return [
"## Documentation",
`OpenClaw docs: ${docsPath}`,
"Mirror: https://docs.openclaw.ai",
"Source: https://github.com/openclaw/openclaw",
`Moltbot docs: ${docsPath}`,
"Mirror: https://docs.molt.bot",
"Source: https://github.com/moltbot/moltbot",
"Community: https://discord.com/invite/clawd",
"Find new skills: https://clawhub.com",
"For OpenClaw behavior, commands, config, or architecture: consult local docs first.",
"When diagnosing issues, run `openclaw status` yourself when possible; only ask the user if you lack access (e.g., sandboxed).",
"Find new skills: https://clawdhub.com",
"For Moltbot behavior, commands, config, or architecture: consult local docs first.",
"When diagnosing issues, run `moltbot status` yourself when possible; only ask the user if you lack access (e.g., sandboxed).",
"",
];
}
@@ -213,6 +193,7 @@ export function buildAgentSystemPrompt(params: {
level: "minimal" | "extensive";
channel: string;
};
memoryCitationsMode?: MemoryCitationsMode;
}) {
const coreToolSummaries: Record<string, string> = {
read: "Read file contents",
@@ -232,7 +213,7 @@ export function buildAgentSystemPrompt(params: {
nodes: "List/describe/notify/camera/screen on paired nodes",
cron: "Manage cron jobs and wake events (use for reminders; when scheduling a reminder, write the systemEvent text as something that will read like a reminder when it fires, and mention that it is a reminder depending on the time gap between setting and firing; include recent context in reminder text if appropriate)",
message: "Send messages and channel actions",
gateway: "Restart, apply config, or run updates on the running OpenClaw process",
gateway: "Restart, apply config, or run updates on the running Moltbot process",
agents_list: "List agent ids allowed for sessions_spawn",
sessions_list: "List other sessions (incl. sub-agents) with filters/last",
sessions_history: "Fetch history for another session/sub-agent",
@@ -287,9 +268,7 @@ export function buildAgentSystemPrompt(params: {
const externalToolSummaries = new Map<string, string>();
for (const [key, value] of Object.entries(params.toolSummaries ?? {})) {
const normalized = key.trim().toLowerCase();
if (!normalized || !value?.trim()) {
continue;
}
if (!normalized || !value?.trim()) continue;
externalToolSummaries.set(normalized, value.trim());
}
const extraTools = Array.from(
@@ -301,7 +280,7 @@ export function buildAgentSystemPrompt(params: {
const name = resolveToolName(tool);
return summary ? `- ${name}: ${summary}` : `- ${name}`;
});
for (const tool of extraTools.toSorted()) {
for (const tool of extraTools.sort()) {
const summary = coreToolSummaries[tool] ?? externalToolSummaries.get(tool);
const name = resolveToolName(tool);
toolLines.push(summary ? `- ${name}: ${summary}` : `- ${name}`);
@@ -351,7 +330,11 @@ export function buildAgentSystemPrompt(params: {
isMinimal,
readToolName,
});
const memorySection = buildMemorySection({ isMinimal, availableTools });
const memorySection = buildMemorySection({
isMinimal,
availableTools,
citationsMode: params.memoryCitationsMode,
});
const docsSection = buildDocsSection({
docsPath: params.docsPath,
isMinimal,
@@ -361,11 +344,11 @@ export function buildAgentSystemPrompt(params: {
// For "none" mode, return just the basic identity line
if (promptMode === "none") {
return "You are a personal assistant running inside OpenClaw.";
return "You are a personal assistant running inside Moltbot.";
}
const lines = [
"You are a personal assistant running inside OpenClaw.",
"You are a personal assistant running inside Moltbot.",
"",
"## Tooling",
"Tool availability (filtered by policy):",
@@ -380,7 +363,7 @@ export function buildAgentSystemPrompt(params: {
"- apply_patch: apply multi-file patches",
`- ${execToolName}: run shell commands (supports background via yieldMs/background)`,
`- ${processToolName}: manage background exec sessions`,
"- browser: control openclaw's dedicated browser",
"- browser: control clawd's dedicated browser",
"- canvas: present/eval/snapshot the Canvas",
"- nodes: list/describe/notify/camera/screen on paired nodes",
"- cron: manage cron jobs and wake events (use for reminders; when scheduling a reminder, write the systemEvent text as something that will read like a reminder when it fires, and mention that it is a reminder depending on the time gap between setting and firing; include recent context in reminder text if appropriate)",
@@ -397,26 +380,25 @@ export function buildAgentSystemPrompt(params: {
"Keep narration brief and value-dense; avoid repeating obvious steps.",
"Use plain human language for narration unless in a technical context.",
"",
...buildSafetySection(),
"## OpenClaw CLI Quick Reference",
"OpenClaw is controlled via subcommands. Do not invent commands.",
"## Moltbot CLI Quick Reference",
"Moltbot is controlled via subcommands. Do not invent commands.",
"To manage the Gateway daemon service (start/stop/restart):",
"- openclaw gateway status",
"- openclaw gateway start",
"- openclaw gateway stop",
"- openclaw gateway restart",
"If unsure, ask the user to run `openclaw help` (or `openclaw gateway --help`) and paste the output.",
"- moltbot gateway status",
"- moltbot gateway start",
"- moltbot gateway stop",
"- moltbot gateway restart",
"If unsure, ask the user to run `moltbot help` (or `moltbot gateway --help`) and paste the output.",
"",
...skillsSection,
...memorySection,
// Skip self-update for subagent/none modes
hasGateway && !isMinimal ? "## OpenClaw Self-Update" : "",
hasGateway && !isMinimal ? "## Moltbot Self-Update" : "",
hasGateway && !isMinimal
? [
"Get Updates (self-update) is ONLY allowed when the user explicitly asks for it.",
"Do not run config.apply or update.run unless the user explicitly requests an update or config change; if it's not explicit, ask first.",
"Actions: config.get, config.schema, config.apply (validate + write full config, then restart), update.run (update deps or git, then restart).",
"After restart, OpenClaw pings the last active session automatically.",
"After restart, Moltbot pings the last active session automatically.",
].join("\n")
: "",
hasGateway && !isMinimal ? "" : "",
@@ -485,7 +467,7 @@ export function buildAgentSystemPrompt(params: {
userTimezone,
}),
"## Workspace Files (injected)",
"These user-editable files are loaded by OpenClaw and included below in Project Context.",
"These user-editable files are loaded by Moltbot and included below in Project Context.",
"",
...buildReplyTagsSection(isMinimal),
...buildMessagingSection({
@@ -576,7 +558,7 @@ export function buildAgentSystemPrompt(params: {
heartbeatPromptLine,
"If you receive a heartbeat poll (a user message matching the heartbeat prompt above), and there is nothing that needs attention, reply exactly:",
"HEARTBEAT_OK",
'OpenClaw treats a leading/trailing "HEARTBEAT_OK" as a heartbeat ack (and may discard it).',
'Moltbot treats a leading/trailing "HEARTBEAT_OK" as a heartbeat ack (and may discard it).',
'If something needs attention, do NOT include "HEARTBEAT_OK"; reply with the alert text instead.',
"",
);
@@ -0,0 +1,65 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
const stubManager = {
search: vi.fn(async () => [
{
path: "MEMORY.md",
startLine: 5,
endLine: 7,
score: 0.9,
snippet: "@@ -5,3 @@\nAssistant: noted",
source: "memory" as const,
},
]),
readFile: vi.fn(),
status: () => ({
backend: "builtin" as const,
files: 1,
chunks: 1,
dirty: false,
workspaceDir: "/workspace",
dbPath: "/workspace/.memory/index.sqlite",
provider: "builtin",
model: "builtin",
requestedProvider: "builtin",
sources: ["memory" as const],
sourceCounts: [{ source: "memory" as const, files: 1, chunks: 1 }],
}),
sync: vi.fn(),
probeVectorAvailability: vi.fn(async () => true),
close: vi.fn(),
};
vi.mock("../../memory/index.js", () => {
return {
getMemorySearchManager: async () => ({ manager: stubManager }),
};
});
import { createMemorySearchTool } from "./memory-tool.js";
beforeEach(() => {
vi.clearAllMocks();
});
describe("memory search citations", () => {
it("appends source information when citations are enabled", async () => {
const cfg = { memory: { citations: "on" }, agents: { list: [{ id: "main", default: true }] } };
const tool = createMemorySearchTool({ config: cfg });
if (!tool) throw new Error("tool missing");
const result = await tool.execute("call_citations_on", { query: "notes" });
const details = result.details as { results: Array<{ snippet: string; citation?: string }> };
expect(details.results[0]?.snippet).toMatch(/Source: MEMORY.md#L5-L7/);
expect(details.results[0]?.citation).toBe("MEMORY.md#L5-L7");
});
it("leaves snippet untouched when citations are off", async () => {
const cfg = { memory: { citations: "off" }, agents: { list: [{ id: "main", default: true }] } };
const tool = createMemorySearchTool({ config: cfg });
if (!tool) throw new Error("tool missing");
const result = await tool.execute("call_citations_off", { query: "notes" });
const details = result.details as { results: Array<{ snippet: string; citation?: string }> };
expect(details.results[0]?.snippet).not.toMatch(/Source:/);
expect(details.results[0]?.citation).toBeUndefined();
});
});
+42 -18
View File
@@ -1,9 +1,12 @@
import { Type } from "@sinclair/typebox";
import type { OpenClawConfig } from "../../config/config.js";
import type { AnyAgentTool } from "./common.js";
import type { MoltbotConfig } from "../../config/config.js";
import type { MemoryCitationsMode } from "../../config/types.memory.js";
import { getMemorySearchManager } from "../../memory/index.js";
import type { MemorySearchResult } from "../../memory/types.js";
import { resolveSessionAgentId } from "../agent-scope.js";
import { resolveMemorySearchConfig } from "../memory-search.js";
import type { AnyAgentTool } from "./common.js";
import { jsonResult, readNumberParam, readStringParam } from "./common.js";
const MemorySearchSchema = Type.Object({
@@ -19,20 +22,16 @@ const MemoryGetSchema = Type.Object({
});
export function createMemorySearchTool(options: {
config?: OpenClawConfig;
config?: MoltbotConfig;
agentSessionKey?: string;
}): AnyAgentTool | null {
const cfg = options.config;
if (!cfg) {
return null;
}
if (!cfg) return null;
const agentId = resolveSessionAgentId({
sessionKey: options.agentSessionKey,
config: cfg,
});
if (!resolveMemorySearchConfig(cfg, agentId)) {
return null;
}
if (!resolveMemorySearchConfig(cfg, agentId)) return null;
return {
label: "Memory Search",
name: "memory_search",
@@ -51,17 +50,21 @@ export function createMemorySearchTool(options: {
return jsonResult({ results: [], disabled: true, error });
}
try {
const results = await manager.search(query, {
const citationsMode = resolveMemoryCitationsMode(cfg);
const includeCitations = citationsMode !== "off";
const rawResults = await manager.search(query, {
maxResults,
minScore,
sessionKey: options.agentSessionKey,
});
const status = manager.status();
const results = decorateCitations(rawResults, includeCitations);
return jsonResult({
results,
provider: status.provider,
model: status.model,
fallback: status.fallback,
citations: citationsMode,
});
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
@@ -72,25 +75,21 @@ export function createMemorySearchTool(options: {
}
export function createMemoryGetTool(options: {
config?: OpenClawConfig;
config?: MoltbotConfig;
agentSessionKey?: string;
}): AnyAgentTool | null {
const cfg = options.config;
if (!cfg) {
return null;
}
if (!cfg) return null;
const agentId = resolveSessionAgentId({
sessionKey: options.agentSessionKey,
config: cfg,
});
if (!resolveMemorySearchConfig(cfg, agentId)) {
return null;
}
if (!resolveMemorySearchConfig(cfg, agentId)) return null;
return {
label: "Memory Get",
name: "memory_get",
description:
"Safe snippet read from MEMORY.md, memory/*.md, or configured memorySearch.extraPaths with optional from/lines; use after memory_search to pull only the needed lines and keep context small.",
"Safe snippet read from MEMORY.md or memory/*.md with optional from/lines; use after memory_search to pull only the needed lines and keep context small.",
parameters: MemoryGetSchema,
execute: async (_toolCallId, params) => {
const relPath = readStringParam(params, "path", { required: true });
@@ -117,3 +116,28 @@ export function createMemoryGetTool(options: {
},
};
}
function resolveMemoryCitationsMode(cfg: MoltbotConfig): MemoryCitationsMode {
const mode = cfg.memory?.citations;
if (mode === "on" || mode === "off" || mode === "auto") return mode;
return "auto";
}
function decorateCitations(results: MemorySearchResult[], include: boolean): MemorySearchResult[] {
if (!include) {
return results.map((entry) => ({ ...entry, citation: undefined }));
}
return results.map((entry) => {
const citation = formatCitation(entry);
const snippet = `${entry.snippet.trim()}\n\nSource: ${citation}`;
return { ...entry, citation, snippet };
});
}
function formatCitation(entry: MemorySearchResult): string {
const lineRange =
entry.startLine === entry.endLine
? `#L${entry.startLine}`
: `#L${entry.startLine}-L${entry.endLine}`;
return `${entry.path}${lineRange}`;
}
@@ -156,6 +156,7 @@ async function resolveContextReport(
ttsHint,
runtimeInfo,
sandboxInfo,
memoryCitationsMode: params.cfg?.memory?.citations,
});
return buildSystemPromptReport({
+14 -22
View File
@@ -1,5 +1,3 @@
import type { MemoryIndexManager } from "../memory/manager.js";
import type { RuntimeEnv } from "../runtime.js";
import { withProgress } from "../cli/progress.js";
import { loadConfig } from "../config/config.js";
import { buildGatewayConnectionDetails, callGateway } from "../gateway/call.js";
@@ -8,14 +6,17 @@ import { probeGateway } from "../gateway/probe.js";
import { collectChannelStatusIssues } from "../infra/channels-status-issues.js";
import { resolveOsSummary } from "../infra/os-summary.js";
import { getTailnetHostname } from "../infra/tailscale.js";
import { getMemorySearchManager } from "../memory/index.js";
import type { MemoryProviderStatus } from "../memory/types.js";
import { runExec } from "../process/exec.js";
import { buildChannelsTable } from "./status-all/channels.js";
import type { RuntimeEnv } from "../runtime.js";
import { getAgentLocalStatuses } from "./status.agent-local.js";
import { pickGatewaySelfPresence, resolveGatewayProbeAuth } from "./status.gateway-probe.js";
import { getStatusSummary } from "./status.summary.js";
import { getUpdateCheckResult } from "./status.update.js";
import { buildChannelsTable } from "./status-all/channels.js";
type MemoryStatusSnapshot = ReturnType<MemoryIndexManager["status"]> & {
type MemoryStatusSnapshot = MemoryProviderStatus & {
agentId: string;
};
@@ -27,9 +28,7 @@ type MemoryPluginStatus = {
function resolveMemoryPluginStatus(cfg: ReturnType<typeof loadConfig>): MemoryPluginStatus {
const pluginsEnabled = cfg.plugins?.enabled !== false;
if (!pluginsEnabled) {
return { enabled: false, slot: null, reason: "plugins disabled" };
}
if (!pluginsEnabled) return { enabled: false, slot: null, reason: "plugins disabled" };
const raw = typeof cfg.plugins?.slots?.memory === "string" ? cfg.plugins.slots.memory.trim() : "";
if (raw && raw.toLowerCase() === "none") {
return { enabled: false, slot: null, reason: 'plugins.slots.memory="none"' };
@@ -127,7 +126,7 @@ export async function scanStatus(
progress.setLabel("Querying channel status…");
const channelsStatus = gatewayReachable
? await callGateway({
? await callGateway<Record<string, unknown>>({
method: "channels.status",
params: {
probe: false,
@@ -142,31 +141,24 @@ export async function scanStatus(
progress.setLabel("Summarizing channels…");
const channels = await buildChannelsTable(cfg, {
// Show token previews in regular status; keep `status --all` redacted.
// Set `OPENCLAW_SHOW_SECRETS=0` to force redaction.
showSecrets: process.env.OPENCLAW_SHOW_SECRETS?.trim() !== "0",
// Set `CLAWDBOT_SHOW_SECRETS=0` to force redaction.
showSecrets: process.env.CLAWDBOT_SHOW_SECRETS?.trim() !== "0",
});
progress.tick();
progress.setLabel("Checking memory…");
const memoryPlugin = resolveMemoryPluginStatus(cfg);
const memory = await (async (): Promise<MemoryStatusSnapshot | null> => {
if (!memoryPlugin.enabled) {
return null;
}
if (memoryPlugin.slot !== "memory-core") {
return null;
}
if (!memoryPlugin.enabled) return null;
if (memoryPlugin.slot !== "memory-core") return null;
const agentId = agentStatus.defaultId ?? "main";
const { MemoryIndexManager } = await import("../memory/manager.js");
const manager = await MemoryIndexManager.get({ cfg, agentId }).catch(() => null);
if (!manager) {
return null;
}
const { manager } = await getMemorySearchManager({ cfg, agentId });
if (!manager) return null;
try {
await manager.probeVectorAvailability();
} catch {}
const status = manager.status();
await manager.close().catch(() => {});
await manager.close?.().catch(() => {});
return { agentId, ...status };
})();
progress.tick();
+52
View File
@@ -254,6 +254,27 @@ const FIELD_LABELS: Record<string, string> = {
"Memory Search Hybrid Candidate Multiplier",
"agents.defaults.memorySearch.cache.enabled": "Memory Search Embedding Cache",
"agents.defaults.memorySearch.cache.maxEntries": "Memory Search Embedding Cache Max Entries",
memory: "Memory",
"memory.backend": "Memory Backend",
"memory.citations": "Memory Citations Mode",
"memory.qmd.command": "QMD Binary",
"memory.qmd.includeDefaultMemory": "QMD Include Default Memory",
"memory.qmd.paths": "QMD Extra Paths",
"memory.qmd.paths.path": "QMD Path",
"memory.qmd.paths.pattern": "QMD Path Pattern",
"memory.qmd.paths.name": "QMD Path Name",
"memory.qmd.sessions.enabled": "QMD Session Indexing",
"memory.qmd.sessions.exportDir": "QMD Session Export Directory",
"memory.qmd.sessions.retentionDays": "QMD Session Retention (days)",
"memory.qmd.sessions.redactToolOutputs": "QMD Session Tool Redaction",
"memory.qmd.update.interval": "QMD Update Interval",
"memory.qmd.update.debounceMs": "QMD Update Debounce (ms)",
"memory.qmd.update.onBoot": "QMD Update on Startup",
"memory.qmd.limits.maxResults": "QMD Max Results",
"memory.qmd.limits.maxSnippetChars": "QMD Max Snippet Chars",
"memory.qmd.limits.maxInjectedChars": "QMD Max Injected Chars",
"memory.qmd.limits.timeoutMs": "QMD Search Timeout (ms)",
"memory.qmd.scope": "QMD Surface Scope",
"auth.profiles": "Auth Profiles",
"auth.order": "Auth Profile Order",
"auth.cooldowns.billingBackoffHours": "Billing Backoff (hours)",
@@ -548,6 +569,37 @@ const FIELD_HELP: Record<string, string> = {
"Multiplier for candidate pool size (default: 4).",
"agents.defaults.memorySearch.cache.enabled":
"Cache chunk embeddings in SQLite to speed up reindexing and frequent updates (default: true).",
memory: "Memory backend configuration (global).",
"memory.backend": 'Memory backend ("builtin" for Moltbot embeddings, "qmd" for QMD sidecar).',
"memory.citations": 'Default citation behavior ("auto", "on", or "off").',
"memory.qmd.command": "Path to the qmd binary (default: resolves from PATH).",
"memory.qmd.includeDefaultMemory":
"Whether to automatically index MEMORY.md + memory/**/*.md (default: true).",
"memory.qmd.paths":
"Additional directories/files to index with QMD (path + optional glob pattern).",
"memory.qmd.paths.path": "Absolute or ~-relative path to index via QMD.",
"memory.qmd.paths.pattern": "Glob pattern relative to the path root (default: **/*.md).",
"memory.qmd.paths.name":
"Optional stable name for the QMD collection (default derived from path).",
"memory.qmd.sessions.enabled":
"Enable QMD session transcript indexing (experimental, default: false).",
"memory.qmd.sessions.exportDir":
"Override directory for sanitized session exports before indexing.",
"memory.qmd.sessions.retentionDays":
"Retention window for exported sessions before pruning (default: unlimited).",
"memory.qmd.sessions.redactToolOutputs":
"Strip tool call payloads/results when exporting sessions (default: true).",
"memory.qmd.update.interval":
"How often the QMD sidecar refreshes indexes (duration string, default: 5m).",
"memory.qmd.update.debounceMs":
"Minimum delay between successive QMD refresh runs (default: 15000).",
"memory.qmd.update.onBoot": "Run QMD update once on gateway startup (default: true).",
"memory.qmd.limits.maxResults": "Max QMD results returned to the agent loop (default: 6).",
"memory.qmd.limits.maxSnippetChars": "Max characters per snippet pulled from QMD (default: 700).",
"memory.qmd.limits.maxInjectedChars": "Max total characters injected from QMD hits per turn.",
"memory.qmd.limits.timeoutMs": "Per-query timeout for QMD searches (default: 4000).",
"memory.qmd.scope":
"Session/channel scope for QMD recall (same syntax as session.sendPolicy; default: direct-only).",
"agents.defaults.memorySearch.cache.maxEntries":
"Optional cap on cached embeddings (best-effort).",
"agents.defaults.memorySearch.sync.onSearch":
+46
View File
@@ -0,0 +1,46 @@
import type { SessionSendPolicyConfig } from "./types.base.js";
export type MemoryBackend = "builtin" | "qmd";
export type MemoryCitationsMode = "auto" | "on" | "off";
export type MemoryConfig = {
backend?: MemoryBackend;
citations?: MemoryCitationsMode;
qmd?: MemoryQmdConfig;
};
export type MemoryQmdConfig = {
command?: string;
includeDefaultMemory?: boolean;
paths?: MemoryQmdIndexPath[];
sessions?: MemoryQmdSessionConfig;
update?: MemoryQmdUpdateConfig;
limits?: MemoryQmdLimitsConfig;
scope?: SessionSendPolicyConfig;
};
export type MemoryQmdIndexPath = {
path: string;
name?: string;
pattern?: string;
};
export type MemoryQmdSessionConfig = {
enabled?: boolean;
exportDir?: string;
retentionDays?: number;
redactToolOutputs?: boolean;
};
export type MemoryQmdUpdateConfig = {
interval?: string;
debounceMs?: number;
onBoot?: boolean;
};
export type MemoryQmdLimitsConfig = {
maxResults?: number;
maxSnippetChars?: number;
maxInjectedChars?: number;
timeoutMs?: number;
};
+2
View File
@@ -23,6 +23,7 @@ import type { NodeHostConfig } from "./types.node-host.js";
import type { PluginsConfig } from "./types.plugins.js";
import type { SkillsConfig } from "./types.skills.js";
import type { ToolsConfig } from "./types.tools.js";
import type { MemoryConfig } from "./types.memory.js";
export type OpenClawConfig = {
meta?: {
@@ -95,6 +96,7 @@ export type OpenClawConfig = {
canvasHost?: CanvasHostConfig;
talk?: TalkConfig;
gateway?: GatewayConfig;
memory?: MemoryConfig;
};
export type ConfigValidationIssue = {
+1
View File
@@ -28,3 +28,4 @@ export * from "./types.telegram.js";
export * from "./types.tts.js";
export * from "./types.tools.js";
export * from "./types.whatsapp.js";
export * from "./types.memory.js";
+26 -25
View File
@@ -15,6 +15,31 @@ const SessionResetConfigSchema = z
})
.strict();
export const SessionSendPolicySchema = z
.object({
default: z.union([z.literal("allow"), z.literal("deny")]).optional(),
rules: z
.array(
z
.object({
action: z.union([z.literal("allow"), z.literal("deny")]),
match: z
.object({
channel: z.string().optional(),
chatType: z
.union([z.literal("direct"), z.literal("group"), z.literal("channel")])
.optional(),
keyPrefix: z.string().optional(),
})
.strict()
.optional(),
})
.strict(),
)
.optional(),
})
.strict();
export const SessionSchema = z
.object({
scope: z.union([z.literal("per-sender"), z.literal("global")]).optional(),
@@ -50,31 +75,7 @@ export const SessionSchema = z
])
.optional(),
mainKey: z.string().optional(),
sendPolicy: z
.object({
default: z.union([z.literal("allow"), z.literal("deny")]).optional(),
rules: z
.array(
z
.object({
action: z.union([z.literal("allow"), z.literal("deny")]),
match: z
.object({
channel: z.string().optional(),
chatType: z
.union([z.literal("direct"), z.literal("group"), z.literal("channel")])
.optional(),
keyPrefix: z.string().optional(),
})
.strict()
.optional(),
})
.strict(),
)
.optional(),
})
.strict()
.optional(),
sendPolicy: SessionSendPolicySchema.optional(),
agentToAgent: z
.object({
maxPingPongTurns: z.number().int().min(0).max(5).optional(),
+69 -17
View File
@@ -1,11 +1,16 @@
import { z } from "zod";
import { ToolsSchema } from "./zod-schema.agent-runtime.js";
import { AgentsSchema, AudioSchema, BindingsSchema, BroadcastSchema } from "./zod-schema.agents.js";
import { ApprovalsSchema } from "./zod-schema.approvals.js";
import { AgentsSchema, AudioSchema, BindingsSchema, BroadcastSchema } from "./zod-schema.agents.js";
import { HexColorSchema, ModelsConfigSchema } from "./zod-schema.core.js";
import { HookMappingSchema, HooksGmailSchema, InternalHooksSchema } from "./zod-schema.hooks.js";
import { ChannelsSchema } from "./zod-schema.providers.js";
import { CommandsSchema, MessagesSchema, SessionSchema } from "./zod-schema.session.js";
import {
CommandsSchema,
MessagesSchema,
SessionSchema,
SessionSendPolicySchema,
} from "./zod-schema.session.js";
const BrowserSnapshotDefaultsSchema = z
.object({
@@ -27,7 +32,62 @@ const NodeHostSchema = z
.strict()
.optional();
export const OpenClawSchema = z
const MemoryQmdPathSchema = z
.object({
path: z.string(),
name: z.string().optional(),
pattern: z.string().optional(),
})
.strict();
const MemoryQmdSessionSchema = z
.object({
enabled: z.boolean().optional(),
exportDir: z.string().optional(),
retentionDays: z.number().int().nonnegative().optional(),
redactToolOutputs: z.boolean().optional(),
})
.strict();
const MemoryQmdUpdateSchema = z
.object({
interval: z.string().optional(),
debounceMs: z.number().int().nonnegative().optional(),
onBoot: z.boolean().optional(),
})
.strict();
const MemoryQmdLimitsSchema = z
.object({
maxResults: z.number().int().positive().optional(),
maxSnippetChars: z.number().int().positive().optional(),
maxInjectedChars: z.number().int().positive().optional(),
timeoutMs: z.number().int().nonnegative().optional(),
})
.strict();
const MemoryQmdSchema = z
.object({
command: z.string().optional(),
includeDefaultMemory: z.boolean().optional(),
paths: z.array(MemoryQmdPathSchema).optional(),
sessions: MemoryQmdSessionSchema.optional(),
update: MemoryQmdUpdateSchema.optional(),
limits: MemoryQmdLimitsSchema.optional(),
scope: SessionSendPolicySchema.optional(),
})
.strict();
const MemorySchema = z
.object({
backend: z.union([z.literal("builtin"), z.literal("qmd")]).optional(),
citations: z.union([z.literal("auto"), z.literal("on"), z.literal("off")]).optional(),
qmd: MemoryQmdSchema.optional(),
})
.strict()
.optional();
export const MoltbotSchema = z
.object({
meta: z
.object({
@@ -154,7 +214,7 @@ export const OpenClawSchema = z
.object({
cdpPort: z.number().int().min(1).max(65535).optional(),
cdpUrl: z.string().optional(),
driver: z.union([z.literal("openclaw"), z.literal("extension")]).optional(),
driver: z.union([z.literal("clawd"), z.literal("extension")]).optional(),
color: HexColorSchema,
})
.strict()
@@ -268,7 +328,6 @@ export const OpenClawSchema = z
wideArea: z
.object({
enabled: z.boolean().optional(),
domain: z.string().optional(),
})
.strict()
.optional(),
@@ -446,6 +505,7 @@ export const OpenClawSchema = z
})
.strict()
.optional(),
memory: MemorySchema,
skills: z
.object({
allowBundled: z.array(z.string()).optional(),
@@ -532,23 +592,15 @@ export const OpenClawSchema = z
.strict()
.superRefine((cfg, ctx) => {
const agents = cfg.agents?.list ?? [];
if (agents.length === 0) {
return;
}
if (agents.length === 0) return;
const agentIds = new Set(agents.map((agent) => agent.id));
const broadcast = cfg.broadcast;
if (!broadcast) {
return;
}
if (!broadcast) return;
for (const [peerId, ids] of Object.entries(broadcast)) {
if (peerId === "strategy") {
continue;
}
if (!Array.isArray(ids)) {
continue;
}
if (peerId === "strategy") continue;
if (!Array.isArray(ids)) continue;
for (let idx = 0; idx < ids.length; idx += 1) {
const agentId = ids[idx];
if (!agentIds.has(agentId)) {
+58
View File
@@ -0,0 +1,58 @@
import path from "node:path";
import { describe, expect, it } from "vitest";
import type { MoltbotConfig } from "../config/config.js";
import { resolveAgentWorkspaceDir } from "../agents/agent-scope.js";
import { resolveMemoryBackendConfig } from "./backend-config.js";
describe("resolveMemoryBackendConfig", () => {
it("defaults to builtin backend when config missing", () => {
const cfg = { agents: { defaults: { workspace: "/tmp/memory-test" } } } as MoltbotConfig;
const resolved = resolveMemoryBackendConfig({ cfg, agentId: "main" });
expect(resolved.backend).toBe("builtin");
expect(resolved.citations).toBe("auto");
expect(resolved.qmd).toBeUndefined();
});
it("resolves qmd backend with default collections", () => {
const cfg = {
agents: { defaults: { workspace: "/tmp/memory-test" } },
memory: {
backend: "qmd",
qmd: {},
},
} as MoltbotConfig;
const resolved = resolveMemoryBackendConfig({ cfg, agentId: "main" });
expect(resolved.backend).toBe("qmd");
expect(resolved.qmd?.collections.length).toBeGreaterThanOrEqual(3);
expect(resolved.qmd?.command).toBe("qmd");
expect(resolved.qmd?.update.intervalMs).toBeGreaterThan(0);
});
it("resolves custom paths relative to workspace", () => {
const cfg = {
agents: {
defaults: { workspace: "/workspace/root" },
list: [{ id: "main", workspace: "/workspace/root" }],
},
memory: {
backend: "qmd",
qmd: {
paths: [
{
path: "notes",
name: "custom-notes",
pattern: "**/*.md",
},
],
},
},
} as MoltbotConfig;
const resolved = resolveMemoryBackendConfig({ cfg, agentId: "main" });
const custom = resolved.qmd?.collections.find((c) => c.name.startsWith("custom-notes"));
expect(custom).toBeDefined();
const workspaceRoot = resolveAgentWorkspaceDir(cfg, "main");
expect(custom?.path).toBe(path.resolve(workspaceRoot, "notes"));
});
});
+245
View File
@@ -0,0 +1,245 @@
import path from "node:path";
import { parseDurationMs } from "../cli/parse-duration.js";
import { resolveAgentWorkspaceDir } from "../agents/agent-scope.js";
import type { MoltbotConfig } from "../config/config.js";
import type {
MemoryBackend,
MemoryCitationsMode,
MemoryQmdConfig,
MemoryQmdIndexPath,
} from "../config/types.memory.js";
import type { SessionSendPolicyConfig } from "../config/types.base.js";
import { resolveUserPath } from "../utils.js";
export type ResolvedMemoryBackendConfig = {
backend: MemoryBackend;
citations: MemoryCitationsMode;
qmd?: ResolvedQmdConfig;
};
export type ResolvedQmdCollection = {
name: string;
path: string;
pattern: string;
kind: "memory" | "custom" | "sessions";
};
export type ResolvedQmdUpdateConfig = {
intervalMs: number;
debounceMs: number;
onBoot: boolean;
};
export type ResolvedQmdLimitsConfig = {
maxResults: number;
maxSnippetChars: number;
maxInjectedChars: number;
timeoutMs: number;
};
export type ResolvedQmdSessionConfig = {
enabled: boolean;
exportDir?: string;
retentionDays?: number;
redactToolOutputs: boolean;
};
export type ResolvedQmdConfig = {
command: string;
collections: ResolvedQmdCollection[];
sessions: ResolvedQmdSessionConfig;
update: ResolvedQmdUpdateConfig;
limits: ResolvedQmdLimitsConfig;
includeDefaultMemory: boolean;
scope?: SessionSendPolicyConfig;
};
const DEFAULT_BACKEND: MemoryBackend = "builtin";
const DEFAULT_CITATIONS: MemoryCitationsMode = "auto";
const DEFAULT_QMD_INTERVAL = "5m";
const DEFAULT_QMD_DEBOUNCE_MS = 15_000;
const DEFAULT_QMD_TIMEOUT_MS = 4_000;
const DEFAULT_QMD_LIMITS: ResolvedQmdLimitsConfig = {
maxResults: 6,
maxSnippetChars: 700,
maxInjectedChars: 4_000,
timeoutMs: DEFAULT_QMD_TIMEOUT_MS,
};
const DEFAULT_QMD_SCOPE: SessionSendPolicyConfig = {
default: "deny",
rules: [
{
action: "allow",
match: { chatType: "direct" },
},
],
};
function sanitizeName(input: string): string {
const lower = input.toLowerCase().replace(/[^a-z0-9-]+/g, "-");
const trimmed = lower.replace(/^-+|-+$/g, "");
return trimmed || "collection";
}
function ensureUniqueName(base: string, existing: Set<string>): string {
let name = sanitizeName(base);
if (!existing.has(name)) {
existing.add(name);
return name;
}
let suffix = 2;
while (existing.has(`${name}-${suffix}`)) {
suffix += 1;
}
const unique = `${name}-${suffix}`;
existing.add(unique);
return unique;
}
function resolvePath(raw: string, workspaceDir: string): string {
const trimmed = raw.trim();
if (!trimmed) throw new Error("path required");
if (trimmed.startsWith("~") || path.isAbsolute(trimmed)) {
return path.normalize(resolveUserPath(trimmed));
}
return path.normalize(path.resolve(workspaceDir, trimmed));
}
function resolveIntervalMs(raw: string | undefined): number {
const value = raw?.trim();
if (!value) return parseDurationMs(DEFAULT_QMD_INTERVAL, { defaultUnit: "m" });
try {
return parseDurationMs(value, { defaultUnit: "m" });
} catch {
return parseDurationMs(DEFAULT_QMD_INTERVAL, { defaultUnit: "m" });
}
}
function resolveDebounceMs(raw: number | undefined): number {
if (typeof raw === "number" && Number.isFinite(raw) && raw >= 0) {
return Math.floor(raw);
}
return DEFAULT_QMD_DEBOUNCE_MS;
}
function resolveLimits(raw?: MemoryQmdConfig["limits"]): ResolvedQmdLimitsConfig {
const parsed: ResolvedQmdLimitsConfig = { ...DEFAULT_QMD_LIMITS };
if (raw?.maxResults && raw.maxResults > 0) parsed.maxResults = Math.floor(raw.maxResults);
if (raw?.maxSnippetChars && raw.maxSnippetChars > 0) {
parsed.maxSnippetChars = Math.floor(raw.maxSnippetChars);
}
if (raw?.maxInjectedChars && raw.maxInjectedChars > 0) {
parsed.maxInjectedChars = Math.floor(raw.maxInjectedChars);
}
if (raw?.timeoutMs && raw.timeoutMs > 0) {
parsed.timeoutMs = Math.floor(raw.timeoutMs);
}
return parsed;
}
function resolveSessionConfig(
cfg: MemoryQmdConfig["sessions"],
workspaceDir: string,
): ResolvedQmdSessionConfig {
const enabled = Boolean(cfg?.enabled);
const exportDirRaw = cfg?.exportDir?.trim();
const exportDir = exportDirRaw ? resolvePath(exportDirRaw, workspaceDir) : undefined;
const retentionDays =
cfg?.retentionDays && cfg.retentionDays > 0 ? Math.floor(cfg.retentionDays) : undefined;
const redactToolOutputs = cfg?.redactToolOutputs !== false;
return {
enabled,
exportDir,
retentionDays,
redactToolOutputs,
};
}
function resolveCustomPaths(
rawPaths: MemoryQmdIndexPath[] | undefined,
workspaceDir: string,
existing: Set<string>,
): ResolvedQmdCollection[] {
if (!rawPaths?.length) return [];
const collections: ResolvedQmdCollection[] = [];
rawPaths.forEach((entry, index) => {
const trimmedPath = entry?.path?.trim();
if (!trimmedPath) return;
let resolved: string;
try {
resolved = resolvePath(trimmedPath, workspaceDir);
} catch {
return;
}
const pattern = entry.pattern?.trim() || "**/*.md";
const baseName = entry.name?.trim() || `custom-${index + 1}`;
const name = ensureUniqueName(baseName, existing);
collections.push({
name,
path: resolved,
pattern,
kind: "custom",
});
});
return collections;
}
function resolveDefaultCollections(
include: boolean,
workspaceDir: string,
existing: Set<string>,
): ResolvedQmdCollection[] {
if (!include) return [];
const entries: Array<{ path: string; pattern: string; base: string }> = [
{ path: workspaceDir, pattern: "MEMORY.md", base: "memory-root" },
{ path: workspaceDir, pattern: "memory.md", base: "memory-alt" },
{ path: path.join(workspaceDir, "memory"), pattern: "**/*.md", base: "memory-dir" },
];
return entries.map((entry) => ({
name: ensureUniqueName(entry.base, existing),
path: entry.path,
pattern: entry.pattern,
kind: "memory",
}));
}
export function resolveMemoryBackendConfig(params: {
cfg: MoltbotConfig;
agentId: string;
}): ResolvedMemoryBackendConfig {
const backend = params.cfg.memory?.backend ?? DEFAULT_BACKEND;
const citations = params.cfg.memory?.citations ?? DEFAULT_CITATIONS;
if (backend !== "qmd") {
return { backend: "builtin", citations };
}
const workspaceDir = resolveAgentWorkspaceDir(params.cfg, params.agentId);
const qmdCfg = params.cfg.memory?.qmd;
const includeDefaultMemory = qmdCfg?.includeDefaultMemory !== false;
const nameSet = new Set<string>();
const collections = [
...resolveDefaultCollections(includeDefaultMemory, workspaceDir, nameSet),
...resolveCustomPaths(qmdCfg?.paths, workspaceDir, nameSet),
];
const resolved: ResolvedQmdConfig = {
command: qmdCfg?.command?.trim() || "qmd",
collections,
includeDefaultMemory,
sessions: resolveSessionConfig(qmdCfg?.sessions, workspaceDir),
update: {
intervalMs: resolveIntervalMs(qmdCfg?.update?.interval),
debounceMs: resolveDebounceMs(qmdCfg?.update?.debounceMs),
onBoot: qmdCfg?.update?.onBoot !== false,
},
limits: resolveLimits(qmdCfg?.limits),
scope: qmdCfg?.scope ?? DEFAULT_QMD_SCOPE,
};
return {
backend: "qmd",
citations,
qmd: resolved,
};
}
+2 -1
View File
@@ -1,2 +1,3 @@
export type { MemoryIndexManager, MemorySearchResult } from "./manager.js";
export { MemoryIndexManager } from "./manager.js";
export type { MemorySearchResult, MemorySearchManager } from "./types.js";
export { getMemorySearchManager, type MemorySearchManagerResult } from "./search-manager.js";
+125 -384
View File
@@ -1,25 +1,18 @@
import type { DatabaseSync } from "node:sqlite";
import chokidar, { type FSWatcher } from "chokidar";
import { randomUUID } from "node:crypto";
import fsSync from "node:fs";
import fs from "node:fs/promises";
import path from "node:path";
import type { ResolvedMemorySearchConfig } from "../agents/memory-search.js";
import type { OpenClawConfig } from "../config/config.js";
import type { DatabaseSync } from "node:sqlite";
import chokidar, { type FSWatcher } from "chokidar";
import { resolveAgentDir, resolveAgentWorkspaceDir } from "../agents/agent-scope.js";
import type { ResolvedMemorySearchConfig } from "../agents/memory-search.js";
import { resolveMemorySearchConfig } from "../agents/memory-search.js";
import type { MoltbotConfig } from "../config/config.js";
import { resolveSessionTranscriptsDirForAgent } from "../config/sessions/paths.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { onSessionTranscriptUpdate } from "../sessions/transcript-events.js";
import { resolveUserPath } from "../utils.js";
import { runGeminiEmbeddingBatches, type GeminiBatchRequest } from "./batch-gemini.js";
import {
OPENAI_BATCH_ENDPOINT,
type OpenAiBatchRequest,
runOpenAiEmbeddingBatches,
} from "./batch-openai.js";
import { DEFAULT_GEMINI_EMBEDDING_MODEL } from "./embeddings-gemini.js";
import { DEFAULT_OPENAI_EMBEDDING_MODEL } from "./embeddings-openai.js";
import {
createEmbeddingProvider,
type EmbeddingProvider,
@@ -27,7 +20,14 @@ import {
type GeminiEmbeddingClient,
type OpenAiEmbeddingClient,
} from "./embeddings.js";
import { bm25RankToScore, buildFtsQuery, mergeHybridResults } from "./hybrid.js";
import { DEFAULT_GEMINI_EMBEDDING_MODEL } from "./embeddings-gemini.js";
import { DEFAULT_OPENAI_EMBEDDING_MODEL } from "./embeddings-openai.js";
import {
OPENAI_BATCH_ENDPOINT,
type OpenAiBatchRequest,
runOpenAiEmbeddingBatches,
} from "./batch-openai.js";
import { runGeminiEmbeddingBatches, type GeminiBatchRequest } from "./batch-gemini.js";
import {
buildFileEntry,
chunkMarkdown,
@@ -35,26 +35,23 @@ import {
hashText,
isMemoryPath,
listMemoryFiles,
normalizeExtraMemoryPaths,
type MemoryChunk,
type MemoryFileEntry,
normalizeRelPath,
parseEmbedding,
} from "./internal.js";
import { bm25RankToScore, buildFtsQuery, mergeHybridResults } from "./hybrid.js";
import { searchKeyword, searchVector } from "./manager-search.js";
import { ensureMemoryIndexSchema } from "./memory-schema.js";
import { loadSqliteVecExtension } from "./sqlite-vec.js";
import { requireNodeSqlite } from "./sqlite.js";
type MemorySource = "memory" | "sessions";
export type MemorySearchResult = {
path: string;
startLine: number;
endLine: number;
score: number;
snippet: string;
source: MemorySource;
};
import { loadSqliteVecExtension } from "./sqlite-vec.js";
import type {
MemoryProviderStatus,
MemorySearchManager,
MemorySearchResult,
MemorySource,
MemorySyncProgressUpdate,
} from "./types.js";
type MemoryIndexMeta = {
model: string;
@@ -74,12 +71,6 @@ type SessionFileEntry = {
content: string;
};
type MemorySyncProgressUpdate = {
completed: number;
total: number;
label?: string;
};
type MemorySyncProgressState = {
completed: number;
total: number;
@@ -114,9 +105,9 @@ const INDEX_CACHE = new Map<string, MemoryIndexManager>();
const vectorToBlob = (embedding: number[]): Buffer =>
Buffer.from(new Float32Array(embedding).buffer);
export class MemoryIndexManager {
export class MemoryIndexManager implements MemorySearchManager {
private readonly cacheKey: string;
private readonly cfg: OpenClawConfig;
private readonly cfg: MoltbotConfig;
private readonly agentId: string;
private readonly workspaceDir: string;
private readonly settings: ResolvedMemorySearchConfig;
@@ -172,20 +163,16 @@ export class MemoryIndexManager {
private syncing: Promise<void> | null = null;
static async get(params: {
cfg: OpenClawConfig;
cfg: MoltbotConfig;
agentId: string;
}): Promise<MemoryIndexManager | null> {
const { cfg, agentId } = params;
const settings = resolveMemorySearchConfig(cfg, agentId);
if (!settings) {
return null;
}
if (!settings) return null;
const workspaceDir = resolveAgentWorkspaceDir(cfg, agentId);
const key = `${agentId}:${workspaceDir}:${JSON.stringify(settings)}`;
const existing = INDEX_CACHE.get(key);
if (existing) {
return existing;
}
if (existing) return existing;
const providerResult = await createEmbeddingProvider({
config: cfg,
agentDir: resolveAgentDir(cfg, agentId),
@@ -209,7 +196,7 @@ export class MemoryIndexManager {
private constructor(params: {
cacheKey: string;
cfg: OpenClawConfig;
cfg: MoltbotConfig;
agentId: string;
workspaceDir: string;
settings: ResolvedMemorySearchConfig;
@@ -252,19 +239,13 @@ export class MemoryIndexManager {
}
async warmSession(sessionKey?: string): Promise<void> {
if (!this.settings.sync.onSessionStart) {
return;
}
if (!this.settings.sync.onSessionStart) return;
const key = sessionKey?.trim() || "";
if (key && this.sessionWarm.has(key)) {
return;
}
if (key && this.sessionWarm.has(key)) return;
void this.sync({ reason: "session-start" }).catch((err) => {
log.warn(`memory sync failed (session-start): ${String(err)}`);
});
if (key) {
this.sessionWarm.add(key);
}
if (key) this.sessionWarm.add(key);
}
async search(
@@ -282,9 +263,7 @@ export class MemoryIndexManager {
});
}
const cleaned = query.trim();
if (!cleaned) {
return [];
}
if (!cleaned) return [];
const minScore = opts?.minScore ?? this.settings.query.minScore;
const maxResults = opts?.maxResults ?? this.settings.query.maxResults;
const hybrid = this.settings.query.hybrid;
@@ -343,9 +322,7 @@ export class MemoryIndexManager {
query: string,
limit: number,
): Promise<Array<MemorySearchResult & { id: string; textScore: number }>> {
if (!this.fts.enabled || !this.fts.available) {
return [];
}
if (!this.fts.enabled || !this.fts.available) return [];
const sourceFilter = this.buildSourceFilter();
const results = await searchKeyword({
db: this.db,
@@ -397,9 +374,7 @@ export class MemoryIndexManager {
force?: boolean;
progress?: (update: MemorySyncProgressUpdate) => void;
}): Promise<void> {
if (this.syncing) {
return this.syncing;
}
if (this.syncing) return this.syncing;
this.syncing = this.runSync(params).finally(() => {
this.syncing = null;
});
@@ -411,54 +386,13 @@ export class MemoryIndexManager {
from?: number;
lines?: number;
}): Promise<{ text: string; path: string }> {
const rawPath = params.relPath.trim();
if (!rawPath) {
const relPath = normalizeRelPath(params.relPath);
if (!relPath || !isMemoryPath(relPath)) {
throw new Error("path required");
}
const absPath = path.isAbsolute(rawPath)
? path.resolve(rawPath)
: path.resolve(this.workspaceDir, rawPath);
const relPath = path.relative(this.workspaceDir, absPath).replace(/\\/g, "/");
const inWorkspace =
relPath.length > 0 && !relPath.startsWith("..") && !path.isAbsolute(relPath);
const allowedWorkspace = inWorkspace && isMemoryPath(relPath);
let allowedAdditional = false;
if (!allowedWorkspace && this.settings.extraPaths.length > 0) {
const additionalPaths = normalizeExtraMemoryPaths(
this.workspaceDir,
this.settings.extraPaths,
);
for (const additionalPath of additionalPaths) {
try {
const stat = await fs.lstat(additionalPath);
if (stat.isSymbolicLink()) {
continue;
}
if (stat.isDirectory()) {
if (absPath === additionalPath || absPath.startsWith(`${additionalPath}${path.sep}`)) {
allowedAdditional = true;
break;
}
continue;
}
if (stat.isFile()) {
if (absPath === additionalPath && absPath.endsWith(".md")) {
allowedAdditional = true;
break;
}
}
} catch {}
}
}
if (!allowedWorkspace && !allowedAdditional) {
throw new Error("path required");
}
if (!absPath.endsWith(".md")) {
throw new Error("path required");
}
const stat = await fs.lstat(absPath);
if (stat.isSymbolicLink() || !stat.isFile()) {
throw new Error("path required");
const absPath = path.resolve(this.workspaceDir, relPath);
if (!absPath.startsWith(this.workspaceDir)) {
throw new Error("path escapes workspace");
}
const content = await fs.readFile(absPath, "utf-8");
if (!params.from && !params.lines) {
@@ -471,40 +405,7 @@ export class MemoryIndexManager {
return { text: slice.join("\n"), path: relPath };
}
status(): {
files: number;
chunks: number;
dirty: boolean;
workspaceDir: string;
dbPath: string;
provider: string;
model: string;
requestedProvider: string;
sources: MemorySource[];
extraPaths: string[];
sourceCounts: Array<{ source: MemorySource; files: number; chunks: number }>;
cache?: { enabled: boolean; entries?: number; maxEntries?: number };
fts?: { enabled: boolean; available: boolean; error?: string };
fallback?: { from: string; reason?: string };
vector?: {
enabled: boolean;
available?: boolean;
extensionPath?: string;
loadError?: string;
dims?: number;
};
batch?: {
enabled: boolean;
failures: number;
limit: number;
wait: boolean;
concurrency: number;
pollIntervalMs: number;
timeoutMs: number;
lastError?: string;
lastProvider?: string;
};
} {
status(): MemoryProviderStatus {
const sourceFilter = this.buildSourceFilter();
const files = this.db
.prepare(`SELECT COUNT(*) as c FROM files WHERE 1=1${sourceFilter.sql}`)
@@ -518,9 +419,7 @@ export class MemoryIndexManager {
};
const sourceCounts = (() => {
const sources = Array.from(this.sources);
if (sources.length === 0) {
return [];
}
if (sources.length === 0) return [];
const bySource = new Map<MemorySource, { files: number; chunks: number }>();
for (const source of sources) {
bySource.set(source, { files: 0, chunks: 0 });
@@ -545,19 +444,19 @@ export class MemoryIndexManager {
entry.chunks = row.c ?? 0;
bySource.set(row.source, entry);
}
return sources.map((source) => Object.assign({ source }, bySource.get(source)!));
return sources.map((source) => ({ source, ...bySource.get(source)! }));
})();
return {
backend: "builtin",
files: files?.c ?? 0,
chunks: chunks?.c ?? 0,
dirty: this.dirty,
dirty: this.dirty || this.sessionsDirty,
workspaceDir: this.workspaceDir,
dbPath: this.settings.store.path,
provider: this.provider.id,
model: this.provider.model,
requestedProvider: this.requestedProvider,
sources: Array.from(this.sources),
extraPaths: this.settings.extraPaths,
sourceCounts,
cache: this.cache.enabled
? {
@@ -601,9 +500,7 @@ export class MemoryIndexManager {
}
async probeVectorAvailability(): Promise<boolean> {
if (!this.vector.enabled) {
return false;
}
if (!this.vector.enabled) return false;
return this.ensureVectorReady();
}
@@ -618,9 +515,7 @@ export class MemoryIndexManager {
}
async close(): Promise<void> {
if (this.closed) {
return;
}
if (this.closed) return;
this.closed = true;
if (this.watchTimer) {
clearTimeout(this.watchTimer);
@@ -647,9 +542,7 @@ export class MemoryIndexManager {
}
private async ensureVectorReady(dimensions?: number): Promise<boolean> {
if (!this.vector.enabled) {
return false;
}
if (!this.vector.enabled) return false;
if (!this.vectorReady) {
this.vectorReady = this.withTimeout(
this.loadVectorExtension(),
@@ -675,9 +568,7 @@ export class MemoryIndexManager {
}
private async loadVectorExtension(): Promise<boolean> {
if (this.vector.available !== null) {
return this.vector.available;
}
if (this.vector.available !== null) return this.vector.available;
if (!this.vector.enabled) {
this.vector.available = false;
return false;
@@ -687,9 +578,7 @@ export class MemoryIndexManager {
? resolveUserPath(this.vector.extensionPath)
: undefined;
const loaded = await loadSqliteVecExtension({ db: this.db, extensionPath: resolvedPath });
if (!loaded.ok) {
throw new Error(loaded.error ?? "unknown sqlite-vec load error");
}
if (!loaded.ok) throw new Error(loaded.error ?? "unknown sqlite-vec load error");
this.vector.extensionPath = loaded.extensionPath;
this.vector.available = true;
return true;
@@ -703,9 +592,7 @@ export class MemoryIndexManager {
}
private ensureVectorTable(dimensions: number): void {
if (this.vector.dims === dimensions) {
return;
}
if (this.vector.dims === dimensions) return;
if (this.vector.dims && this.vector.dims !== dimensions) {
this.dropVectorTable();
}
@@ -729,9 +616,7 @@ export class MemoryIndexManager {
private buildSourceFilter(alias?: string): { sql: string; params: MemorySource[] } {
const sources = Array.from(this.sources);
if (sources.length === 0) {
return { sql: "", params: [] };
}
if (sources.length === 0) return { sql: "", params: [] };
const column = alias ? `${alias}.source` : "source";
const placeholders = sources.map(() => "?").join(", ");
return { sql: ` AND ${column} IN (${placeholders})`, params: sources };
@@ -750,9 +635,7 @@ export class MemoryIndexManager {
}
private seedEmbeddingCache(sourceDb: DatabaseSync): void {
if (!this.cache.enabled) {
return;
}
if (!this.cache.enabled) return;
try {
const rows = sourceDb
.prepare(
@@ -767,9 +650,7 @@ export class MemoryIndexManager {
dims: number | null;
updated_at: number;
}>;
if (!rows.length) {
return;
}
if (!rows.length) return;
const insert = this.db.prepare(
`INSERT INTO ${EMBEDDING_CACHE_TABLE} (provider, model, provider_key, hash, embedding, dims, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
@@ -846,26 +727,12 @@ export class MemoryIndexManager {
}
private ensureWatcher() {
if (!this.sources.has("memory") || !this.settings.sync.watch || this.watcher) {
return;
}
const additionalPaths = normalizeExtraMemoryPaths(this.workspaceDir, this.settings.extraPaths)
.map((entry) => {
try {
const stat = fsSync.lstatSync(entry);
return stat.isSymbolicLink() ? null : entry;
} catch {
return null;
}
})
.filter((entry): entry is string => Boolean(entry));
const watchPaths = new Set<string>([
if (!this.sources.has("memory") || !this.settings.sync.watch || this.watcher) return;
const watchPaths = [
path.join(this.workspaceDir, "MEMORY.md"),
path.join(this.workspaceDir, "memory.md"),
path.join(this.workspaceDir, "memory"),
...additionalPaths,
]);
this.watcher = chokidar.watch(Array.from(watchPaths), {
];
this.watcher = chokidar.watch(watchPaths, {
ignoreInitial: true,
awaitWriteFinish: {
stabilityThreshold: this.settings.sync.watchDebounceMs,
@@ -882,26 +749,18 @@ export class MemoryIndexManager {
}
private ensureSessionListener() {
if (!this.sources.has("sessions") || this.sessionUnsubscribe) {
return;
}
if (!this.sources.has("sessions") || this.sessionUnsubscribe) return;
this.sessionUnsubscribe = onSessionTranscriptUpdate((update) => {
if (this.closed) {
return;
}
if (this.closed) return;
const sessionFile = update.sessionFile;
if (!this.isSessionFileForAgent(sessionFile)) {
return;
}
if (!this.isSessionFileForAgent(sessionFile)) return;
this.scheduleSessionDirty(sessionFile);
});
}
private scheduleSessionDirty(sessionFile: string) {
this.sessionPendingFiles.add(sessionFile);
if (this.sessionWatchTimer) {
return;
}
if (this.sessionWatchTimer) return;
this.sessionWatchTimer = setTimeout(() => {
this.sessionWatchTimer = null;
void this.processSessionDeltaBatch().catch((err) => {
@@ -911,17 +770,13 @@ export class MemoryIndexManager {
}
private async processSessionDeltaBatch(): Promise<void> {
if (this.sessionPendingFiles.size === 0) {
return;
}
if (this.sessionPendingFiles.size === 0) return;
const pending = Array.from(this.sessionPendingFiles);
this.sessionPendingFiles.clear();
let shouldSync = false;
for (const sessionFile of pending) {
const delta = await this.updateSessionDelta(sessionFile);
if (!delta) {
continue;
}
if (!delta) continue;
const bytesThreshold = delta.deltaBytes;
const messagesThreshold = delta.deltaMessages;
const bytesHit =
@@ -930,9 +785,7 @@ export class MemoryIndexManager {
messagesThreshold <= 0
? delta.pendingMessages > 0
: delta.pendingMessages >= messagesThreshold;
if (!bytesHit && !messagesHit) {
continue;
}
if (!bytesHit && !messagesHit) continue;
this.sessionsDirtyFiles.add(sessionFile);
this.sessionsDirty = true;
delta.pendingBytes =
@@ -955,9 +808,7 @@ export class MemoryIndexManager {
pendingMessages: number;
} | null> {
const thresholds = this.settings.sync.sessions;
if (!thresholds) {
return null;
}
if (!thresholds) return null;
let stat: { size: number };
try {
stat = await fs.stat(sessionFile);
@@ -1008,9 +859,7 @@ export class MemoryIndexManager {
}
private async countNewlines(absPath: string, start: number, end: number): Promise<number> {
if (end <= start) {
return 0;
}
if (end <= start) return 0;
const handle = await fs.open(absPath, "r");
try {
let offset = start;
@@ -1019,13 +868,9 @@ export class MemoryIndexManager {
while (offset < end) {
const toRead = Math.min(buffer.length, end - offset);
const { bytesRead } = await handle.read(buffer, 0, toRead, offset);
if (bytesRead <= 0) {
break;
}
if (bytesRead <= 0) break;
for (let i = 0; i < bytesRead; i += 1) {
if (buffer[i] === 10) {
count += 1;
}
if (buffer[i] === 10) count += 1;
}
offset += bytesRead;
}
@@ -1037,18 +882,14 @@ export class MemoryIndexManager {
private resetSessionDelta(absPath: string, size: number): void {
const state = this.sessionDeltas.get(absPath);
if (!state) {
return;
}
if (!state) return;
state.lastSize = size;
state.pendingBytes = 0;
state.pendingMessages = 0;
}
private isSessionFileForAgent(sessionFile: string): boolean {
if (!sessionFile) {
return false;
}
if (!sessionFile) return false;
const sessionsDir = resolveSessionTranscriptsDirForAgent(this.agentId);
const resolvedFile = path.resolve(sessionFile);
const resolvedDir = path.resolve(sessionsDir);
@@ -1057,9 +898,7 @@ export class MemoryIndexManager {
private ensureIntervalSync() {
const minutes = this.settings.sync.intervalMinutes;
if (!minutes || minutes <= 0 || this.intervalTimer) {
return;
}
if (!minutes || minutes <= 0 || this.intervalTimer) return;
const ms = minutes * 60 * 1000;
this.intervalTimer = setInterval(() => {
void this.sync({ reason: "interval" }).catch((err) => {
@@ -1069,12 +908,8 @@ export class MemoryIndexManager {
}
private scheduleWatchSync() {
if (!this.sources.has("memory") || !this.settings.sync.watch) {
return;
}
if (this.watchTimer) {
clearTimeout(this.watchTimer);
}
if (!this.sources.has("memory") || !this.settings.sync.watch) return;
if (this.watchTimer) clearTimeout(this.watchTimer);
this.watchTimer = setTimeout(() => {
this.watchTimer = null;
void this.sync({ reason: "watch" }).catch((err) => {
@@ -1087,19 +922,11 @@ export class MemoryIndexManager {
params?: { reason?: string; force?: boolean },
needsFullReindex = false,
) {
if (!this.sources.has("sessions")) {
return false;
}
if (params?.force) {
return true;
}
if (!this.sources.has("sessions")) return false;
if (params?.force) return true;
const reason = params?.reason;
if (reason === "session-start" || reason === "watch") {
return false;
}
if (needsFullReindex) {
return true;
}
if (reason === "session-start" || reason === "watch") return false;
if (needsFullReindex) return true;
return this.sessionsDirty && this.sessionsDirtyFiles.size > 0;
}
@@ -1107,7 +934,7 @@ export class MemoryIndexManager {
needsFullReindex: boolean;
progress?: MemorySyncProgressState;
}) {
const files = await listMemoryFiles(this.workspaceDir, this.settings.extraPaths);
const files = await listMemoryFiles(this.workspaceDir);
const fileEntries = await Promise.all(
files.map(async (file) => buildFileEntry(file, this.workspaceDir)),
);
@@ -1156,9 +983,7 @@ export class MemoryIndexManager {
.prepare(`SELECT path FROM files WHERE source = ?`)
.all("memory") as Array<{ path: string }>;
for (const stale of staleRows) {
if (activePaths.has(stale.path)) {
continue;
}
if (activePaths.has(stale.path)) continue;
this.db.prepare(`DELETE FROM files WHERE path = ? AND source = ?`).run(stale.path, "memory");
try {
this.db
@@ -1253,9 +1078,7 @@ export class MemoryIndexManager {
.prepare(`SELECT path FROM files WHERE source = ?`)
.all("sessions") as Array<{ path: string }>;
for (const stale of staleRows) {
if (activePaths.has(stale.path)) {
continue;
}
if (activePaths.has(stale.path)) continue;
this.db
.prepare(`DELETE FROM files WHERE path = ? AND source = ?`)
.run(stale.path, "sessions");
@@ -1287,9 +1110,7 @@ export class MemoryIndexManager {
total: 0,
label: undefined,
report: (update) => {
if (update.label) {
state.label = update.label;
}
if (update.label) state.label = update.label;
const label =
update.total > 0 && state.label
? `${state.label} ${update.completed}/${update.total}`
@@ -1400,12 +1221,8 @@ export class MemoryIndexManager {
private async activateFallbackProvider(reason: string): Promise<boolean> {
const fallback = this.settings.fallback;
if (!fallback || fallback === "none" || fallback === this.provider.id) {
return false;
}
if (this.fallbackFrom) {
return false;
}
if (!fallback || fallback === "none" || fallback === this.provider.id) return false;
if (this.fallbackFrom) return false;
const fallbackFrom = this.provider.id as "openai" | "gemini" | "local";
const fallbackModel =
@@ -1557,9 +1374,7 @@ export class MemoryIndexManager {
const row = this.db.prepare(`SELECT value FROM meta WHERE key = ?`).get(META_KEY) as
| { value: string }
| undefined;
if (!row?.value) {
return null;
}
if (!row?.value) return null;
try {
return JSON.parse(row.value) as MemoryIndexMeta;
} catch {
@@ -1606,26 +1421,16 @@ export class MemoryIndexManager {
const normalized = this.normalizeSessionText(content);
return normalized ? normalized : null;
}
if (!Array.isArray(content)) {
return null;
}
if (!Array.isArray(content)) return null;
const parts: string[] = [];
for (const block of content) {
if (!block || typeof block !== "object") {
continue;
}
if (!block || typeof block !== "object") continue;
const record = block as { type?: unknown; text?: unknown };
if (record.type !== "text" || typeof record.text !== "string") {
continue;
}
if (record.type !== "text" || typeof record.text !== "string") continue;
const normalized = this.normalizeSessionText(record.text);
if (normalized) {
parts.push(normalized);
}
}
if (parts.length === 0) {
return null;
if (normalized) parts.push(normalized);
}
if (parts.length === 0) return null;
return parts.join(" ");
}
@@ -1636,9 +1441,7 @@ export class MemoryIndexManager {
const lines = raw.split("\n");
const collected: string[] = [];
for (const line of lines) {
if (!line.trim()) {
continue;
}
if (!line.trim()) continue;
let record: unknown;
try {
record = JSON.parse(line);
@@ -1655,16 +1458,10 @@ export class MemoryIndexManager {
const message = (record as { message?: unknown }).message as
| { role?: unknown; content?: unknown }
| undefined;
if (!message || typeof message.role !== "string") {
continue;
}
if (message.role !== "user" && message.role !== "assistant") {
continue;
}
if (!message || typeof message.role !== "string") continue;
if (message.role !== "user" && message.role !== "assistant") continue;
const text = this.extractSessionText(message.content);
if (!text) {
continue;
}
if (!text) continue;
const label = message.role === "user" ? "User" : "Assistant";
collected.push(`${label}: ${text}`);
}
@@ -1684,9 +1481,7 @@ export class MemoryIndexManager {
}
private estimateEmbeddingTokens(text: string): number {
if (!text) {
return 0;
}
if (!text) return 0;
return Math.ceil(text.length / EMBEDDING_APPROX_CHARS_PER_TOKEN);
}
@@ -1719,27 +1514,17 @@ export class MemoryIndexManager {
}
private loadEmbeddingCache(hashes: string[]): Map<string, number[]> {
if (!this.cache.enabled) {
return new Map();
}
if (hashes.length === 0) {
return new Map();
}
if (!this.cache.enabled) return new Map();
if (hashes.length === 0) return new Map();
const unique: string[] = [];
const seen = new Set<string>();
for (const hash of hashes) {
if (!hash) {
continue;
}
if (seen.has(hash)) {
continue;
}
if (!hash) continue;
if (seen.has(hash)) continue;
seen.add(hash);
unique.push(hash);
}
if (unique.length === 0) {
return new Map();
}
if (unique.length === 0) return new Map();
const out = new Map<string, number[]>();
const baseParams = [this.provider.id, this.provider.model, this.providerKey];
@@ -1761,12 +1546,8 @@ export class MemoryIndexManager {
}
private upsertEmbeddingCache(entries: Array<{ hash: string; embedding: number[] }>): void {
if (!this.cache.enabled) {
return;
}
if (entries.length === 0) {
return;
}
if (!this.cache.enabled) return;
if (entries.length === 0) return;
const now = Date.now();
const stmt = this.db.prepare(
`INSERT INTO ${EMBEDDING_CACHE_TABLE} (provider, model, provider_key, hash, embedding, dims, updated_at)\n` +
@@ -1791,20 +1572,14 @@ export class MemoryIndexManager {
}
private pruneEmbeddingCacheIfNeeded(): void {
if (!this.cache.enabled) {
return;
}
if (!this.cache.enabled) return;
const max = this.cache.maxEntries;
if (!max || max <= 0) {
return;
}
if (!max || max <= 0) return;
const row = this.db.prepare(`SELECT COUNT(*) as c FROM ${EMBEDDING_CACHE_TABLE}`).get() as
| { c: number }
| undefined;
const count = row?.c ?? 0;
if (count <= max) {
return;
}
if (count <= max) return;
const excess = count - max;
this.db
.prepare(
@@ -1819,9 +1594,7 @@ export class MemoryIndexManager {
}
private async embedChunksInBatches(chunks: MemoryChunk[]): Promise<number[][]> {
if (chunks.length === 0) {
return [];
}
if (chunks.length === 0) return [];
const cached = this.loadEmbeddingCache(chunks.map((chunk) => chunk.hash));
const embeddings: number[][] = Array.from({ length: chunks.length }, () => []);
const missing: Array<{ index: number; chunk: MemoryChunk }> = [];
@@ -1836,9 +1609,7 @@ export class MemoryIndexManager {
}
}
if (missing.length === 0) {
return embeddings;
}
if (missing.length === 0) return embeddings;
const missingChunks = missing.map((m) => m.chunk);
const batches = this.buildEmbeddingBatches(missingChunks);
@@ -1864,7 +1635,7 @@ export class MemoryIndexManager {
if (this.provider.id === "openai" && this.openAi) {
const entries = Object.entries(this.openAi.headers)
.filter(([key]) => key.toLowerCase() !== "authorization")
.toSorted(([a], [b]) => a.localeCompare(b))
.sort(([a], [b]) => a.localeCompare(b))
.map(([key, value]) => [key, value]);
return hashText(
JSON.stringify({
@@ -1881,7 +1652,7 @@ export class MemoryIndexManager {
const lower = key.toLowerCase();
return lower !== "authorization" && lower !== "x-goog-api-key";
})
.toSorted(([a], [b]) => a.localeCompare(b))
.sort(([a], [b]) => a.localeCompare(b))
.map(([key, value]) => [key, value]);
return hashText(
JSON.stringify({
@@ -1918,9 +1689,7 @@ export class MemoryIndexManager {
if (!openAi) {
return this.embedChunksInBatches(chunks);
}
if (chunks.length === 0) {
return [];
}
if (chunks.length === 0) return [];
const cached = this.loadEmbeddingCache(chunks.map((chunk) => chunk.hash));
const embeddings: number[][] = Array.from({ length: chunks.length }, () => []);
const missing: Array<{ index: number; chunk: MemoryChunk }> = [];
@@ -1935,9 +1704,7 @@ export class MemoryIndexManager {
}
}
if (missing.length === 0) {
return embeddings;
}
if (missing.length === 0) return embeddings;
const requests: OpenAiBatchRequest[] = [];
const mapping = new Map<string, { index: number; hash: string }>();
@@ -1972,17 +1739,13 @@ export class MemoryIndexManager {
}),
fallback: async () => await this.embedChunksInBatches(chunks),
});
if (Array.isArray(batchResult)) {
return batchResult;
}
if (Array.isArray(batchResult)) return batchResult;
const byCustomId = batchResult;
const toCache: Array<{ hash: string; embedding: number[] }> = [];
for (const [customId, embedding] of byCustomId.entries()) {
const mapped = mapping.get(customId);
if (!mapped) {
continue;
}
if (!mapped) continue;
embeddings[mapped.index] = embedding;
toCache.push({ hash: mapped.hash, embedding });
}
@@ -1999,9 +1762,7 @@ export class MemoryIndexManager {
if (!gemini) {
return this.embedChunksInBatches(chunks);
}
if (chunks.length === 0) {
return [];
}
if (chunks.length === 0) return [];
const cached = this.loadEmbeddingCache(chunks.map((chunk) => chunk.hash));
const embeddings: number[][] = Array.from({ length: chunks.length }, () => []);
const missing: Array<{ index: number; chunk: MemoryChunk }> = [];
@@ -2016,9 +1777,7 @@ export class MemoryIndexManager {
}
}
if (missing.length === 0) {
return embeddings;
}
if (missing.length === 0) return embeddings;
const requests: GeminiBatchRequest[] = [];
const mapping = new Map<string, { index: number; hash: string }>();
@@ -2050,17 +1809,13 @@ export class MemoryIndexManager {
}),
fallback: async () => await this.embedChunksInBatches(chunks),
});
if (Array.isArray(batchResult)) {
return batchResult;
}
if (Array.isArray(batchResult)) return batchResult;
const byCustomId = batchResult;
const toCache: Array<{ hash: string; embedding: number[] }> = [];
for (const [customId, embedding] of byCustomId.entries()) {
const mapped = mapping.get(customId);
if (!mapped) {
continue;
}
if (!mapped) continue;
embeddings[mapped.index] = embedding;
toCache.push({ hash: mapped.hash, embedding });
}
@@ -2069,9 +1824,7 @@ export class MemoryIndexManager {
}
private async embedBatchWithRetry(texts: string[]): Promise<number[][]> {
if (texts.length === 0) {
return [];
}
if (texts.length === 0) return [];
let attempt = 0;
let delayMs = EMBEDDING_RETRY_BASE_DELAY_MS;
while (true) {
@@ -2133,9 +1886,7 @@ export class MemoryIndexManager {
timeoutMs: number,
message: string,
): Promise<T> {
if (!Number.isFinite(timeoutMs) || timeoutMs <= 0) {
return await promise;
}
if (!Number.isFinite(timeoutMs) || timeoutMs <= 0) return await promise;
let timer: NodeJS.Timeout | null = null;
const timeoutPromise = new Promise<never>((_, reject) => {
timer = setTimeout(() => reject(new Error(message)), timeoutMs);
@@ -2143,16 +1894,12 @@ export class MemoryIndexManager {
try {
return (await Promise.race([promise, timeoutPromise])) as T;
} finally {
if (timer) {
clearTimeout(timer);
}
if (timer) clearTimeout(timer);
}
}
private async runWithConcurrency<T>(tasks: Array<() => Promise<T>>, limit: number): Promise<T[]> {
if (tasks.length === 0) {
return [];
}
if (tasks.length === 0) return [];
const resolvedLimit = Math.max(1, Math.min(limit, tasks.length));
const results: T[] = Array.from({ length: tasks.length });
let next = 0;
@@ -2160,14 +1907,10 @@ export class MemoryIndexManager {
const workers = Array.from({ length: resolvedLimit }, async () => {
while (true) {
if (firstError) {
return;
}
if (firstError) return;
const index = next;
next += 1;
if (index >= tasks.length) {
return;
}
if (index >= tasks.length) return;
try {
results[index] = await tasks[index]();
} catch (err) {
@@ -2178,9 +1921,7 @@ export class MemoryIndexManager {
});
await Promise.allSettled(workers);
if (firstError) {
throw firstError;
}
if (firstError) throw firstError;
return results;
}
+612
View File
@@ -0,0 +1,612 @@
import { spawn } from "node:child_process";
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import YAML from "yaml";
import type { MoltbotConfig } from "../config/config.js";
import { resolveStateDir } from "../config/paths.js";
import { resolveAgentWorkspaceDir } from "../agents/agent-scope.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import {
listSessionFilesForAgent,
buildSessionEntry,
type SessionFileEntry,
} from "./session-files.js";
import { requireNodeSqlite } from "./sqlite.js";
import type {
MemoryProviderStatus,
MemorySearchManager,
MemorySearchResult,
MemorySource,
MemorySyncProgressUpdate,
} from "./types.js";
import type { ResolvedMemoryBackendConfig, ResolvedQmdConfig } from "./backend-config.js";
const log = createSubsystemLogger("memory");
const SNIPPET_HEADER_RE = /@@\s*-([0-9]+),([0-9]+)/;
type QmdQueryResult = {
docid?: string;
score?: number;
file?: string;
snippet?: string;
body?: string;
};
type CollectionRoot = {
path: string;
kind: MemorySource;
};
type SessionExporterConfig = {
dir: string;
retentionMs?: number;
collectionName: string;
};
export class QmdMemoryManager implements MemorySearchManager {
static async create(params: {
cfg: MoltbotConfig;
agentId: string;
resolved: ResolvedMemoryBackendConfig;
}): Promise<QmdMemoryManager | null> {
const resolved = params.resolved.qmd;
if (!resolved) return null;
const manager = new QmdMemoryManager({ cfg: params.cfg, agentId: params.agentId, resolved });
await manager.initialize();
return manager;
}
private readonly cfg: MoltbotConfig;
private readonly agentId: string;
private readonly qmd: ResolvedQmdConfig;
private readonly workspaceDir: string;
private readonly stateDir: string;
private readonly agentStateDir: string;
private readonly qmdDir: string;
private readonly cacheDir: string;
private readonly configDir: string;
private readonly xdgConfigHome: string;
private readonly xdgCacheHome: string;
private readonly collectionsFile: string;
private readonly indexPath: string;
private readonly env: NodeJS.ProcessEnv;
private readonly collectionRoots = new Map<string, CollectionRoot>();
private readonly sources = new Set<MemorySource>();
private readonly docPathCache = new Map<
string,
{ rel: string; abs: string; source: MemorySource }
>();
private readonly sessionExporter: SessionExporterConfig | null;
private updateTimer: NodeJS.Timeout | null = null;
private pendingUpdate: Promise<void> | null = null;
private closed = false;
private db: import("node:sqlite").DatabaseSync | null = null;
private lastUpdateAt: number | null = null;
private constructor(params: {
cfg: MoltbotConfig;
agentId: string;
resolved: ResolvedQmdConfig;
}) {
this.cfg = params.cfg;
this.agentId = params.agentId;
this.qmd = params.resolved;
this.workspaceDir = resolveAgentWorkspaceDir(params.cfg, params.agentId);
this.stateDir = resolveStateDir(process.env, os.homedir);
this.agentStateDir = path.join(this.stateDir, "agents", this.agentId);
this.qmdDir = path.join(this.agentStateDir, "qmd");
this.cacheDir = path.join(this.qmdDir, "cache");
this.configDir = path.join(this.qmdDir, "config");
this.xdgConfigHome = path.join(this.qmdDir, "xdg-config");
this.xdgCacheHome = path.join(this.qmdDir, "xdg-cache");
this.collectionsFile = path.join(this.configDir, "index.yml");
this.indexPath = path.join(this.cacheDir, "index.sqlite");
this.env = {
...process.env,
QMD_CONFIG_DIR: this.configDir,
XDG_CONFIG_HOME: this.xdgConfigHome,
XDG_CACHE_HOME: this.xdgCacheHome,
INDEX_PATH: this.indexPath,
NO_COLOR: "1",
};
this.sessionExporter = this.qmd.sessions.enabled
? {
dir: this.qmd.sessions.exportDir ?? path.join(this.qmdDir, "sessions"),
retentionMs: this.qmd.sessions.retentionDays
? this.qmd.sessions.retentionDays * 24 * 60 * 60 * 1000
: undefined,
collectionName: this.pickSessionCollectionName(),
}
: null;
if (this.sessionExporter) {
this.qmd.collections = [
...this.qmd.collections,
{
name: this.sessionExporter.collectionName,
path: this.sessionExporter.dir,
pattern: "**/*.md",
kind: "sessions",
},
];
}
}
private async initialize(): Promise<void> {
await fs.mkdir(this.cacheDir, { recursive: true });
await fs.mkdir(this.configDir, { recursive: true });
await fs.mkdir(this.xdgConfigHome, { recursive: true });
await fs.mkdir(this.xdgCacheHome, { recursive: true });
this.bootstrapCollections();
await this.writeCollectionsConfig();
if (this.qmd.update.onBoot) {
await this.runUpdate("boot", true);
}
if (this.qmd.update.intervalMs > 0) {
this.updateTimer = setInterval(() => {
void this.runUpdate("interval").catch((err) => {
log.warn(`qmd update failed (${String(err)})`);
});
}, this.qmd.update.intervalMs);
}
}
private bootstrapCollections(): void {
this.collectionRoots.clear();
this.sources.clear();
for (const collection of this.qmd.collections) {
const kind: MemorySource = collection.kind === "sessions" ? "sessions" : "memory";
this.collectionRoots.set(collection.name, { path: collection.path, kind });
this.sources.add(kind);
}
}
private async writeCollectionsConfig(): Promise<void> {
const collections: Record<string, { path: string; pattern: string }> = {};
for (const collection of this.qmd.collections) {
collections[collection.name] = {
path: collection.path,
pattern: collection.pattern,
};
}
const yaml = YAML.stringify({ collections }, { indent: 2, lineWidth: 0 });
await fs.writeFile(this.collectionsFile, yaml, "utf-8");
}
async search(
query: string,
opts?: { maxResults?: number; minScore?: number; sessionKey?: string },
): Promise<MemorySearchResult[]> {
if (!this.isScopeAllowed(opts?.sessionKey)) return [];
const trimmed = query.trim();
if (!trimmed) return [];
await this.pendingUpdate?.catch(() => undefined);
const limit = Math.min(
this.qmd.limits.maxResults,
opts?.maxResults ?? this.qmd.limits.maxResults,
);
const args = ["query", trimmed, "--json", "-n", String(limit)];
let stdout: string;
try {
const result = await this.runQmd(args, { timeoutMs: this.qmd.limits.timeoutMs });
stdout = result.stdout;
} catch (err) {
log.warn(`qmd query failed: ${String(err)}`);
throw err instanceof Error ? err : new Error(String(err));
}
let parsed: QmdQueryResult[] = [];
try {
parsed = JSON.parse(stdout);
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
log.warn(`qmd query returned invalid JSON: ${message}`);
throw new Error(`qmd query returned invalid JSON: ${message}`);
}
const results: MemorySearchResult[] = [];
for (const entry of parsed) {
const doc = await this.resolveDocLocation(entry.docid);
if (!doc) continue;
const snippet = entry.snippet?.slice(0, this.qmd.limits.maxSnippetChars) ?? "";
const lines = this.extractSnippetLines(snippet);
const score = typeof entry.score === "number" ? entry.score : 0;
const minScore = opts?.minScore ?? 0;
if (score < minScore) continue;
results.push({
path: doc.rel,
startLine: lines.startLine,
endLine: lines.endLine,
score,
snippet,
source: doc.source,
});
}
return results.slice(0, limit);
}
async sync(params?: {
reason?: string;
force?: boolean;
progress?: (update: MemorySyncProgressUpdate) => void;
}): Promise<void> {
if (params?.progress) {
params.progress({ completed: 0, total: 1, label: "Updating QMD index…" });
}
await this.runUpdate(params?.reason ?? "manual", params?.force);
if (params?.progress) {
params.progress({ completed: 1, total: 1, label: "QMD index updated" });
}
}
async readFile(params: {
relPath: string;
from?: number;
lines?: number;
}): Promise<{ text: string; path: string }> {
const relPath = params.relPath?.trim();
if (!relPath) throw new Error("path required");
const absPath = this.resolveReadPath(relPath);
const content = await fs.readFile(absPath, "utf-8");
if (!params.from && !params.lines) {
return { text: content, path: relPath };
}
const lines = content.split("\n");
const start = Math.max(1, params.from ?? 1);
const count = Math.max(1, params.lines ?? lines.length);
const slice = lines.slice(start - 1, start - 1 + count);
return { text: slice.join("\n"), path: relPath };
}
status(): MemoryProviderStatus {
const counts = this.readCounts();
return {
backend: "qmd",
provider: "qmd",
model: "qmd",
requestedProvider: "qmd",
files: counts.totalDocuments,
chunks: counts.totalDocuments,
dirty: false,
workspaceDir: this.workspaceDir,
dbPath: this.indexPath,
sources: Array.from(this.sources),
sourceCounts: counts.sourceCounts,
vector: { enabled: true, available: true },
batch: {
enabled: false,
failures: 0,
limit: 0,
wait: false,
concurrency: 0,
pollIntervalMs: 0,
timeoutMs: 0,
},
custom: {
qmd: {
collections: this.qmd.collections.length,
lastUpdateAt: this.lastUpdateAt,
},
},
};
}
async probeVectorAvailability(): Promise<boolean> {
return true;
}
async close(): Promise<void> {
if (this.closed) return;
this.closed = true;
if (this.updateTimer) {
clearInterval(this.updateTimer);
this.updateTimer = null;
}
await this.pendingUpdate?.catch(() => undefined);
if (this.db) {
this.db.close();
this.db = null;
}
}
private async runUpdate(reason: string, force?: boolean): Promise<void> {
if (this.pendingUpdate && !force) return this.pendingUpdate;
const run = async () => {
if (this.sessionExporter) {
await this.exportSessions();
}
await this.runQmd(["update"], { timeoutMs: 120_000 });
try {
await this.runQmd(["embed"], { timeoutMs: 120_000 });
} catch (err) {
log.warn(`qmd embed failed (${reason}): ${String(err)}`);
}
this.lastUpdateAt = Date.now();
this.docPathCache.clear();
};
this.pendingUpdate = run().finally(() => {
this.pendingUpdate = null;
});
await this.pendingUpdate;
}
private async runQmd(
args: string[],
opts?: { timeoutMs?: number },
): Promise<{ stdout: string; stderr: string }> {
return await new Promise((resolve, reject) => {
const child = spawn(this.qmd.command, args, {
env: this.env,
cwd: this.workspaceDir,
});
let stdout = "";
let stderr = "";
const timer = opts?.timeoutMs
? setTimeout(() => {
child.kill("SIGKILL");
reject(new Error(`qmd ${args.join(" ")} timed out after ${opts.timeoutMs}ms`));
}, opts.timeoutMs)
: null;
child.stdout.on("data", (data) => {
stdout += data.toString();
});
child.stderr.on("data", (data) => {
stderr += data.toString();
});
child.on("error", (err) => {
if (timer) clearTimeout(timer);
reject(err);
});
child.on("close", (code) => {
if (timer) clearTimeout(timer);
if (code === 0) {
resolve({ stdout, stderr });
} else {
reject(new Error(`qmd ${args.join(" ")} failed (code ${code}): ${stderr || stdout}`));
}
});
});
}
private ensureDb() {
if (this.db) return this.db;
const sqlite = requireNodeSqlite();
this.db = sqlite.open(this.indexPath, { readonly: true });
return this.db;
}
private async exportSessions(): Promise<void> {
if (!this.sessionExporter) return;
const exportDir = this.sessionExporter.dir;
await fs.mkdir(exportDir, { recursive: true });
const files = await listSessionFilesForAgent(this.agentId);
const keep = new Set<string>();
const cutoff = this.sessionExporter.retentionMs
? Date.now() - this.sessionExporter.retentionMs
: null;
for (const sessionFile of files) {
const entry = await buildSessionEntry(sessionFile);
if (!entry) continue;
if (cutoff && entry.mtimeMs < cutoff) continue;
const target = path.join(exportDir, `${path.basename(sessionFile, ".jsonl")}.md`);
await fs.writeFile(target, this.renderSessionMarkdown(entry), "utf-8");
keep.add(target);
}
const exported = await fs.readdir(exportDir).catch(() => []);
for (const name of exported) {
if (!name.endsWith(".md")) continue;
const full = path.join(exportDir, name);
if (!keep.has(full)) {
await fs.rm(full, { force: true });
}
}
}
private renderSessionMarkdown(entry: SessionFileEntry): string {
const header = `# Session ${path.basename(entry.absPath, path.extname(entry.absPath))}`;
const body = entry.content?.trim().length ? entry.content.trim() : "(empty)";
return `${header}\n\n${body}\n`;
}
private pickSessionCollectionName(): string {
const existing = new Set(this.qmd.collections.map((collection) => collection.name));
if (!existing.has("sessions")) return "sessions";
let counter = 2;
let candidate = `sessions-${counter}`;
while (existing.has(candidate)) {
counter += 1;
candidate = `sessions-${counter}`;
}
return candidate;
}
private async resolveDocLocation(
docid?: string,
): Promise<{ rel: string; abs: string; source: MemorySource } | null> {
if (!docid) return null;
const normalized = docid.startsWith("#") ? docid.slice(1) : docid;
if (!normalized) return null;
const cached = this.docPathCache.get(normalized);
if (cached) return cached;
const db = this.ensureDb();
const row = db
.prepare("SELECT collection, path FROM documents WHERE hash LIKE ? AND active = 1 LIMIT 1")
.get(`${normalized}%`) as { collection: string; path: string } | undefined;
if (!row) return null;
const location = this.toDocLocation(row.collection, row.path);
if (!location) return null;
this.docPathCache.set(normalized, location);
return location;
}
private extractSnippetLines(snippet: string): { startLine: number; endLine: number } {
const match = SNIPPET_HEADER_RE.exec(snippet);
if (match) {
const start = Number(match[1]);
const count = Number(match[2]);
if (Number.isFinite(start) && Number.isFinite(count)) {
return { startLine: start, endLine: start + count - 1 };
}
}
const lines = snippet.split("\n").length;
return { startLine: 1, endLine: lines };
}
private readCounts(): {
totalDocuments: number;
sourceCounts: Array<{ source: MemorySource; files: number; chunks: number }>;
} {
try {
const db = this.ensureDb();
const rows = db
.prepare(
"SELECT collection, COUNT(*) as c FROM documents WHERE active = 1 GROUP BY collection",
)
.all() as Array<{ collection: string; c: number }>;
const bySource = new Map<MemorySource, { files: number; chunks: number }>();
for (const source of this.sources) {
bySource.set(source, { files: 0, chunks: 0 });
}
let total = 0;
for (const row of rows) {
const root = this.collectionRoots.get(row.collection);
const source = root?.kind ?? "memory";
const entry = bySource.get(source) ?? { files: 0, chunks: 0 };
entry.files += row.c ?? 0;
entry.chunks += row.c ?? 0;
bySource.set(source, entry);
total += row.c ?? 0;
}
return {
totalDocuments: total,
sourceCounts: Array.from(bySource.entries()).map(([source, value]) => ({
source,
files: value.files,
chunks: value.chunks,
})),
};
} catch (err) {
log.warn(`failed to read qmd index stats: ${String(err)}`);
return {
totalDocuments: 0,
sourceCounts: Array.from(this.sources).map((source) => ({ source, files: 0, chunks: 0 })),
};
}
}
private isScopeAllowed(sessionKey?: string): boolean {
const scope = this.qmd.scope;
if (!scope) return true;
const channel = this.deriveChannelFromKey(sessionKey);
const chatType = this.deriveChatTypeFromKey(sessionKey);
const normalizedKey = sessionKey ?? "";
for (const rule of scope.rules ?? []) {
if (!rule) continue;
const match = rule.match ?? {};
if (match.channel && match.channel !== channel) continue;
if (match.chatType && match.chatType !== chatType) continue;
if (match.keyPrefix && !normalizedKey.startsWith(match.keyPrefix)) continue;
return rule.action === "allow";
}
const fallback = scope.default ?? "allow";
return fallback === "allow";
}
private deriveChannelFromKey(key?: string) {
if (!key) return undefined;
const parts = key.split(":").filter(Boolean);
if (parts.length >= 3 && (parts[1] === "group" || parts[1] === "channel")) {
return parts[0]?.toLowerCase();
}
return undefined;
}
private deriveChatTypeFromKey(key?: string) {
if (!key) return undefined;
if (key.includes(":group:")) return "group";
if (key.includes(":channel:")) return "channel";
return "direct";
}
private toDocLocation(
collection: string,
collectionRelativePath: string,
): { rel: string; abs: string; source: MemorySource } | null {
const root = this.collectionRoots.get(collection);
if (!root) return null;
const normalizedRelative = collectionRelativePath.replace(/\\/g, "/");
const absPath = path.normalize(path.resolve(root.path, collectionRelativePath));
const relativeToWorkspace = path.relative(this.workspaceDir, absPath);
const relPath = this.buildSearchPath(
collection,
normalizedRelative,
relativeToWorkspace,
absPath,
);
return { rel: relPath, abs: absPath, source: root.kind };
}
private buildSearchPath(
collection: string,
collectionRelativePath: string,
relativeToWorkspace: string,
absPath: string,
): string {
const insideWorkspace = this.isInsideWorkspace(relativeToWorkspace);
if (insideWorkspace) {
const normalized = relativeToWorkspace.replace(/\\/g, "/");
if (!normalized) return path.basename(absPath);
return normalized;
}
const sanitized = collectionRelativePath.replace(/^\/+/, "");
return `qmd/${collection}/${sanitized}`;
}
private isInsideWorkspace(relativePath: string): boolean {
if (!relativePath) return true;
if (relativePath.startsWith("..")) return false;
if (relativePath.startsWith(`..${path.sep}`)) return false;
return !path.isAbsolute(relativePath);
}
private resolveReadPath(relPath: string): string {
if (relPath.startsWith("qmd/")) {
const [, collection, ...rest] = relPath.split("/");
if (!collection || rest.length === 0) {
throw new Error("invalid qmd path");
}
const root = this.collectionRoots.get(collection);
if (!root) throw new Error(`unknown qmd collection: ${collection}`);
const joined = rest.join("/");
const resolved = path.resolve(root.path, joined);
if (!this.isWithinRoot(root.path, resolved)) {
throw new Error("qmd path escapes collection");
}
return resolved;
}
const absPath = path.resolve(this.workspaceDir, relPath);
if (!this.isWithinWorkspace(absPath)) {
throw new Error("path escapes workspace");
}
return absPath;
}
private isWithinWorkspace(absPath: string): boolean {
const normalizedWorkspace = this.workspaceDir.endsWith(path.sep)
? this.workspaceDir
: `${this.workspaceDir}${path.sep}`;
if (absPath === this.workspaceDir) return true;
const candidate = absPath.endsWith(path.sep) ? absPath : `${absPath}${path.sep}`;
return candidate.startsWith(normalizedWorkspace);
}
private isWithinRoot(root: string, candidate: string): boolean {
const normalizedRoot = root.endsWith(path.sep) ? root : `${root}${path.sep}`;
if (candidate === root) return true;
const next = candidate.endsWith(path.sep) ? candidate : `${candidate}${path.sep}`;
return next.startsWith(normalizedRoot);
}
}
+62
View File
@@ -0,0 +1,62 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
const mockPrimary = {
search: vi.fn(async () => []),
readFile: vi.fn(async () => ({ text: "", path: "MEMORY.md" })),
status: vi.fn(() => ({
backend: "qmd" as const,
provider: "qmd",
model: "qmd",
requestedProvider: "qmd",
files: 0,
chunks: 0,
dirty: false,
workspaceDir: "/tmp",
dbPath: "/tmp/index.sqlite",
sources: ["memory" as const],
sourceCounts: [{ source: "memory" as const, files: 0, chunks: 0 }],
})),
sync: vi.fn(async () => {}),
probeVectorAvailability: vi.fn(async () => true),
close: vi.fn(async () => {}),
};
vi.mock("./qmd-manager.js", () => ({
QmdMemoryManager: {
create: vi.fn(async () => mockPrimary),
},
}));
vi.mock("./manager.js", () => ({
MemoryIndexManager: {
get: vi.fn(async () => null),
},
}));
import { QmdMemoryManager } from "./qmd-manager.js";
import { getMemorySearchManager } from "./search-manager.js";
beforeEach(() => {
mockPrimary.search.mockClear();
mockPrimary.readFile.mockClear();
mockPrimary.status.mockClear();
mockPrimary.sync.mockClear();
mockPrimary.probeVectorAvailability.mockClear();
mockPrimary.close.mockClear();
QmdMemoryManager.create.mockClear();
});
describe("getMemorySearchManager caching", () => {
it("reuses the same QMD manager instance for repeated calls", async () => {
const cfg = {
memory: { backend: "qmd", qmd: {} },
agents: { list: [{ id: "main", default: true, workspace: "/tmp/workspace" }] },
} as const;
const first = await getMemorySearchManager({ cfg, agentId: "main" });
const second = await getMemorySearchManager({ cfg, agentId: "main" });
expect(first.manager).toBe(second.manager);
expect(QmdMemoryManager.create).toHaveBeenCalledTimes(1);
});
});
+158 -3
View File
@@ -1,15 +1,56 @@
import type { OpenClawConfig } from "../config/config.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import type { MoltbotConfig } from "../config/config.js";
import { resolveMemoryBackendConfig } from "./backend-config.js";
import type { ResolvedQmdConfig } from "./backend-config.js";
import type { MemoryIndexManager } from "./manager.js";
import type { MemorySearchManager, MemorySyncProgressUpdate } from "./types.js";
const log = createSubsystemLogger("memory");
const QMD_MANAGER_CACHE = new Map<string, MemorySearchManager>();
export type MemorySearchManagerResult = {
manager: MemoryIndexManager | null;
manager: MemorySearchManager | null;
error?: string;
};
export async function getMemorySearchManager(params: {
cfg: OpenClawConfig;
cfg: MoltbotConfig;
agentId: string;
}): Promise<MemorySearchManagerResult> {
const resolved = resolveMemoryBackendConfig(params);
if (resolved.backend === "qmd" && resolved.qmd) {
const cacheKey = buildQmdCacheKey(params.agentId, resolved.qmd);
const cached = QMD_MANAGER_CACHE.get(cacheKey);
if (cached) {
return { manager: cached };
}
try {
const { QmdMemoryManager } = await import("./qmd-manager.js");
const primary = await QmdMemoryManager.create({
cfg: params.cfg,
agentId: params.agentId,
resolved,
});
if (primary) {
const wrapper = new FallbackMemoryManager(
{
primary,
fallbackFactory: async () => {
const { MemoryIndexManager } = await import("./manager.js");
return await MemoryIndexManager.get(params);
},
},
() => QMD_MANAGER_CACHE.delete(cacheKey),
);
QMD_MANAGER_CACHE.set(cacheKey, wrapper);
return { manager: wrapper };
}
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
log.warn(`qmd memory unavailable; falling back to builtin: ${message}`);
}
}
try {
const { MemoryIndexManager } = await import("./manager.js");
const manager = await MemoryIndexManager.get(params);
@@ -19,3 +60,117 @@ export async function getMemorySearchManager(params: {
return { manager: null, error: message };
}
}
class FallbackMemoryManager implements MemorySearchManager {
private fallback: MemorySearchManager | null = null;
private primaryFailed = false;
private lastError?: string;
constructor(
private readonly deps: {
primary: MemorySearchManager;
fallbackFactory: () => Promise<MemorySearchManager | null>;
},
private readonly onClose?: () => void,
) {}
async search(
query: string,
opts?: { maxResults?: number; minScore?: number; sessionKey?: string },
) {
if (!this.primaryFailed) {
try {
return await this.deps.primary.search(query, opts);
} catch (err) {
this.primaryFailed = true;
this.lastError = err instanceof Error ? err.message : String(err);
log.warn(`qmd memory failed; switching to builtin index: ${this.lastError}`);
await this.deps.primary.close?.().catch(() => {});
}
}
const fallback = await this.ensureFallback();
if (fallback) {
return await fallback.search(query, opts);
}
throw new Error(this.lastError ?? "memory search unavailable");
}
async readFile(params: { relPath: string; from?: number; lines?: number }) {
if (!this.primaryFailed) {
return await this.deps.primary.readFile(params);
}
const fallback = await this.ensureFallback();
if (fallback) {
return await fallback.readFile(params);
}
throw new Error(this.lastError ?? "memory read unavailable");
}
status() {
if (!this.primaryFailed) {
return this.deps.primary.status();
}
const fallbackStatus = this.fallback?.status();
if (fallbackStatus) {
const custom = fallbackStatus.custom ?? {};
return {
...fallbackStatus,
custom: {
...custom,
fallback: { disabled: true, reason: this.lastError ?? "unknown" },
},
};
}
const primaryStatus = this.deps.primary.status();
const custom = primaryStatus.custom ?? {};
return {
...primaryStatus,
custom: {
...custom,
fallback: { disabled: true, reason: this.lastError ?? "unknown" },
},
};
}
async sync(params?: {
reason?: string;
force?: boolean;
progress?: (update: MemorySyncProgressUpdate) => void;
}) {
if (!this.primaryFailed) {
await this.deps.primary.sync?.(params);
return;
}
const fallback = await this.ensureFallback();
await fallback?.sync?.(params);
}
async probeVectorAvailability() {
if (!this.primaryFailed) {
return await this.deps.primary.probeVectorAvailability();
}
const fallback = await this.ensureFallback();
return (await fallback?.probeVectorAvailability()) ?? false;
}
async close() {
await this.deps.primary.close?.();
await this.fallback?.close?.();
this.onClose?.();
}
private async ensureFallback(): Promise<MemorySearchManager | null> {
if (this.fallback) return this.fallback;
const fallback = await this.deps.fallbackFactory();
if (!fallback) {
log.warn("memory fallback requested but builtin index is unavailable");
return null;
}
this.fallback = fallback;
return this.fallback;
}
}
function buildQmdCacheKey(agentId: string, config: ResolvedQmdConfig): string {
return `${agentId}:${JSON.stringify(config)}`;
}
+72
View File
@@ -0,0 +1,72 @@
export type MemorySource = "memory" | "sessions";
export type MemorySearchResult = {
path: string;
startLine: number;
endLine: number;
score: number;
snippet: string;
source: MemorySource;
citation?: string;
};
export type MemorySyncProgressUpdate = {
completed: number;
total: number;
label?: string;
};
export type MemoryProviderStatus = {
backend: "builtin" | "qmd";
provider: string;
model?: string;
requestedProvider?: string;
files?: number;
chunks?: number;
dirty?: boolean;
workspaceDir?: string;
dbPath?: string;
sources?: MemorySource[];
cache?: { enabled: boolean; entries?: number; maxEntries?: number };
fts?: { enabled: boolean; available: boolean; error?: string };
fallback?: { from: string; reason?: string };
vector?: {
enabled: boolean;
available?: boolean;
extensionPath?: string;
loadError?: string;
dims?: number;
};
batch?: {
enabled: boolean;
failures: number;
limit: number;
wait: boolean;
concurrency: number;
pollIntervalMs: number;
timeoutMs: number;
lastError?: string;
lastProvider?: string;
};
custom?: Record<string, unknown>;
};
export interface MemorySearchManager {
search(
query: string,
opts?: { maxResults?: number; minScore?: number; sessionKey?: string },
): Promise<MemorySearchResult[]>;
readFile(params: {
relPath: string;
from?: number;
lines?: number;
}): Promise<{ text: string; path: string }>;
status(): MemoryProviderStatus;
sync?(params?: {
reason?: string;
force?: boolean;
progress?: (update: MemorySyncProgressUpdate) => void;
}): Promise<void>;
probeVectorAvailability(): Promise<boolean>;
close?(): Promise<void>;
}