Gateway: discriminated protocol schema + CLI updates

This commit is contained in:
Peter Steinberger
2025-12-09 15:01:13 +01:00
parent 2746efeb25
commit 172ce6c79f
23 changed files with 2001 additions and 477 deletions
+13 -3
View File
@@ -58,7 +58,7 @@ describe("cli program", () => {
const program = buildProgram();
await program.parseAsync(
[
"relay",
"relay-legacy",
"--web-heartbeat",
"90",
"--heartbeat-now",
@@ -86,7 +86,7 @@ describe("cli program", () => {
const program = buildProgram();
const prev = process.env.TELEGRAM_BOT_TOKEN;
process.env.TELEGRAM_BOT_TOKEN = "token123";
await program.parseAsync(["relay", "--provider", "telegram"], {
await program.parseAsync(["relay-legacy", "--provider", "telegram"], {
from: "user",
});
expect(monitorTelegramProvider).toHaveBeenCalledWith(
@@ -101,7 +101,7 @@ describe("cli program", () => {
const prev = process.env.TELEGRAM_BOT_TOKEN;
process.env.TELEGRAM_BOT_TOKEN = "";
await expect(
program.parseAsync(["relay", "--provider", "telegram"], {
program.parseAsync(["relay-legacy", "--provider", "telegram"], {
from: "user",
}),
).rejects.toThrow();
@@ -110,6 +110,16 @@ describe("cli program", () => {
process.env.TELEGRAM_BOT_TOKEN = prev;
});
it("relay command is deprecated", async () => {
const program = buildProgram();
await expect(
program.parseAsync(["relay"], { from: "user" }),
).rejects.toThrow("exit");
expect(runtime.error).toHaveBeenCalled();
expect(runtime.exit).toHaveBeenCalledWith(1);
expect(monitorWebProvider).not.toHaveBeenCalled();
});
it("runs status command", async () => {
const program = buildProgram();
await program.parseAsync(["status"], { from: "user" });
+70 -5
View File
@@ -5,9 +5,9 @@ import { healthCommand } from "../commands/health.js";
import { sendCommand } from "../commands/send.js";
import { sessionsCommand } from "../commands/sessions.js";
import { statusCommand } from "../commands/status.js";
import { startGatewayServer } from "../gateway/server.js";
import { callGateway, randomIdempotencyKey } from "../gateway/call.js";
import { loadConfig } from "../config/config.js";
import { callGateway, randomIdempotencyKey } from "../gateway/call.js";
import { startGatewayServer } from "../gateway/server.js";
import { danger, info, setVerbose } from "../globals.js";
import { acquireRelayLock, RelayLockError } from "../infra/relay-lock.js";
import { getResolvedLoggerSettings } from "../logging.js";
@@ -17,7 +17,6 @@ import {
monitorWebProvider,
resolveHeartbeatRecipients,
runWebHeartbeatOnce,
setHeartbeatsEnabled,
type WebMonitorTuning,
} from "../provider-web.js";
import { runRpcLoop } from "../rpc/loop.js";
@@ -364,13 +363,16 @@ Examples:
.option("--url <url>", "Gateway WebSocket URL", "ws://127.0.0.1:18789")
.option("--token <token>", "Gateway token (if required)")
.option("--timeout <ms>", "Timeout in ms", "10000")
.option("--expect-final", "Wait for final response (agent)" , false);
.option("--expect-final", "Wait for final response (agent)", false);
gatewayCallOpts(
program
.command("gw:call")
.description("Call a Gateway method over WS and print JSON")
.argument("<method>", "Method name (health/status/system-presence/send/agent)")
.argument(
"<method>",
"Method name (health/status/system-presence/send/agent)",
)
.option("--params <json>", "JSON object string for params", "{}")
.action(async (method, opts) => {
try {
@@ -560,6 +562,69 @@ Examples:
clawdis relay --heartbeat-now # send immediate agent heartbeat on start (web)
clawdis relay --web-heartbeat 60 # override WhatsApp heartbeat interval
# Troubleshooting: docs/refactor/web-relay-troubleshooting.md
`,
)
.action(async (_opts) => {
defaultRuntime.error(
danger(
"`clawdis relay` is deprecated. Use the WebSocket Gateway (`clawdis gateway`) plus gw:* commands or WebChat/mac app clients.",
),
);
defaultRuntime.exit(1);
});
// relay is deprecated; gateway is the single entry point.
program
.command("relay-legacy")
.description(
"(Deprecated) legacy relay for web/telegram; use `gateway` instead",
)
.option(
"--provider <auto|web|telegram|all>",
"Which providers to start: auto (default), web, telegram, or all",
)
.option(
"--web-heartbeat <seconds>",
"Heartbeat interval for web relay health logs (seconds)",
)
.option(
"--web-retries <count>",
"Max consecutive web reconnect attempts before exit (0 = unlimited)",
)
.option(
"--web-retry-initial <ms>",
"Initial reconnect backoff for web relay (ms)",
)
.option("--web-retry-max <ms>", "Max reconnect backoff for web relay (ms)")
.option(
"--heartbeat-now",
"Run a heartbeat immediately when relay starts",
false,
)
.option(
"--webhook",
"Run Telegram webhook server instead of long-poll",
false,
)
.option(
"--webhook-path <path>",
"Telegram webhook path (default /telegram-webhook when webhook enabled)",
)
.option(
"--webhook-secret <secret>",
"Secret token to verify Telegram webhook requests",
)
.option("--port <port>", "Port for Telegram webhook server (default 8787)")
.option(
"--webhook-url <url>",
"Public Telegram webhook URL to register (overrides localhost autodetect)",
)
.option("--verbose", "Verbose logging", false)
.addHelpText(
"after",
`
This command is legacy and will be removed. Prefer the Gateway.
`,
)
.action(async (opts) => {
+3 -1
View File
@@ -17,7 +17,9 @@ export type CallGatewayOptions = {
maxProtocol?: number;
};
export async function callGateway<T = unknown>(opts: CallGatewayOptions): Promise<T> {
export async function callGateway<T = unknown>(
opts: CallGatewayOptions,
): Promise<T> {
const timeoutMs = opts.timeoutMs ?? 10_000;
return await new Promise<T>((resolve, reject) => {
let settled = false;
+6 -4
View File
@@ -5,13 +5,14 @@ import {
type EventFrame,
type Hello,
type HelloOk,
PROTOCOL_VERSION,
type RequestFrame,
validateRequestFrame,
} from "./protocol/index.js";
type Pending = {
resolve: (value: any) => void;
reject: (err: Error) => void;
reject: (err: any) => void;
expectFinal: boolean;
};
@@ -73,8 +74,8 @@ export class GatewayClient {
private sendHello() {
const hello: Hello = {
type: "hello",
minProtocol: this.opts.minProtocol ?? 1,
maxProtocol: this.opts.maxProtocol ?? 1,
minProtocol: this.opts.minProtocol ?? PROTOCOL_VERSION,
maxProtocol: this.opts.maxProtocol ?? PROTOCOL_VERSION,
client: {
name: this.opts.clientName ?? "webchat-backend",
version: this.opts.clientVersion ?? "dev",
@@ -123,7 +124,8 @@ export class GatewayClient {
}
this.pending.delete(parsed.id);
if (parsed.ok) pending.resolve(parsed.payload);
else pending.reject(new Error(parsed.error?.message ?? "unknown error"));
else
pending.reject(new Error(parsed.error?.message ?? "unknown error"));
}
} catch (err) {
logDebug(`gateway client parse error: ${String(err)}`);
+38 -17
View File
@@ -1,46 +1,60 @@
import AjvPkg, { type ErrorObject } from "ajv";
import {
type AgentEvent,
AgentEventSchema,
AgentParamsSchema,
ErrorCodes,
ErrorShapeSchema,
EventFrameSchema,
HelloErrorSchema,
HelloOkSchema,
HelloSchema,
PresenceEntrySchema,
ProtocolSchemas,
RequestFrameSchema,
ResponseFrameSchema,
SendParamsSchema,
SnapshotSchema,
StateVersionSchema,
errorShape,
type AgentEvent,
type ErrorShape,
ErrorShapeSchema,
type EventFrame,
EventFrameSchema,
errorShape,
type Hello,
type HelloError,
HelloErrorSchema,
type HelloOk,
HelloOkSchema,
HelloSchema,
type PresenceEntry,
PresenceEntrySchema,
ProtocolSchemas,
PROTOCOL_VERSION,
type RequestFrame,
RequestFrameSchema,
type ResponseFrame,
ResponseFrameSchema,
SendParamsSchema,
type Snapshot,
SnapshotSchema,
type StateVersion,
StateVersionSchema,
TickEventSchema,
type TickEvent,
GatewayFrameSchema,
type GatewayFrame,
type ShutdownEvent,
ShutdownEventSchema,
} from "./schema.js";
const ajv = new (AjvPkg as unknown as new (opts?: object) => import("ajv").default)({
const ajv = new (
AjvPkg as unknown as new (
opts?: object,
) => import("ajv").default
)({
allErrors: true,
strict: false,
removeAdditional: false,
});
export const validateHello = ajv.compile<Hello>(HelloSchema);
export const validateRequestFrame = ajv.compile<RequestFrame>(RequestFrameSchema);
export const validateRequestFrame =
ajv.compile<RequestFrame>(RequestFrameSchema);
export const validateSendParams = ajv.compile(SendParamsSchema);
export const validateAgentParams = ajv.compile(AgentParamsSchema);
export function formatValidationErrors(errors: ErrorObject[] | null | undefined) {
export function formatValidationErrors(
errors: ErrorObject[] | null | undefined,
) {
if (!errors) return "unknown validation error";
return ajv.errorsText(errors, { separator: "; " });
}
@@ -52,6 +66,7 @@ export {
RequestFrameSchema,
ResponseFrameSchema,
EventFrameSchema,
GatewayFrameSchema,
PresenceEntrySchema,
SnapshotSchema,
ErrorShapeSchema,
@@ -59,12 +74,16 @@ export {
AgentEventSchema,
SendParamsSchema,
AgentParamsSchema,
TickEventSchema,
ShutdownEventSchema,
ProtocolSchemas,
PROTOCOL_VERSION,
ErrorCodes,
errorShape,
};
export type {
GatewayFrame,
Hello,
HelloOk,
HelloError,
@@ -76,4 +95,6 @@ export type {
ErrorShape,
StateVersion,
AgentEvent,
TickEvent,
ShutdownEvent,
};
+39 -1
View File
@@ -1,4 +1,4 @@
import { Type, type Static, type TSchema } from "@sinclair/typebox";
import { type Static, type TSchema, Type } from "@sinclair/typebox";
const NonEmptyString = Type.String({ minLength: 1 });
@@ -38,6 +38,21 @@ export const SnapshotSchema = Type.Object(
{ additionalProperties: false },
);
export const TickEventSchema = Type.Object(
{
ts: Type.Integer({ minimum: 0 }),
},
{ additionalProperties: false },
);
export const ShutdownEventSchema = Type.Object(
{
reason: NonEmptyString,
restartExpectedMs: Type.Optional(Type.Integer({ minimum: 0 })),
},
{ additionalProperties: false },
);
export const HelloSchema = Type.Object(
{
type: Type.Literal("hello"),
@@ -154,6 +169,21 @@ export const EventFrameSchema = Type.Object(
{ additionalProperties: false },
);
// Discriminated union of all top-level frames. Using a discriminator makes
// downstream codegen (quicktype) produce tighter types instead of all-optional
// blobs.
export const GatewayFrameSchema = Type.Union(
[
HelloSchema,
HelloOkSchema,
HelloErrorSchema,
RequestFrameSchema,
ResponseFrameSchema,
EventFrameSchema,
],
{ discriminator: "type" },
);
export const AgentEventSchema = Type.Object(
{
runId: NonEmptyString,
@@ -196,6 +226,7 @@ export const ProtocolSchemas: Record<string, TSchema> = {
RequestFrame: RequestFrameSchema,
ResponseFrame: ResponseFrameSchema,
EventFrame: EventFrameSchema,
GatewayFrame: GatewayFrameSchema,
PresenceEntry: PresenceEntrySchema,
StateVersion: StateVersionSchema,
Snapshot: SnapshotSchema,
@@ -203,19 +234,26 @@ export const ProtocolSchemas: Record<string, TSchema> = {
AgentEvent: AgentEventSchema,
SendParams: SendParamsSchema,
AgentParams: AgentParamsSchema,
TickEvent: TickEventSchema,
ShutdownEvent: ShutdownEventSchema,
};
export const PROTOCOL_VERSION = 1 as const;
export type Hello = Static<typeof HelloSchema>;
export type HelloOk = Static<typeof HelloOkSchema>;
export type HelloError = Static<typeof HelloErrorSchema>;
export type RequestFrame = Static<typeof RequestFrameSchema>;
export type ResponseFrame = Static<typeof ResponseFrameSchema>;
export type EventFrame = Static<typeof EventFrameSchema>;
export type GatewayFrame = Static<typeof GatewayFrameSchema>;
export type Snapshot = Static<typeof SnapshotSchema>;
export type PresenceEntry = Static<typeof PresenceEntrySchema>;
export type ErrorShape = Static<typeof ErrorShapeSchema>;
export type StateVersion = Static<typeof StateVersionSchema>;
export type AgentEvent = Static<typeof AgentEventSchema>;
export type TickEvent = Static<typeof TickEventSchema>;
export type ShutdownEvent = Static<typeof ShutdownEventSchema>;
export const ErrorCodes = {
NOT_LINKED: "NOT_LINKED",
+261 -147
View File
@@ -1,8 +1,8 @@
import { type AddressInfo, createServer } from "node:net";
import { describe, expect, test, vi } from "vitest";
import { WebSocket } from "ws";
import { AddressInfo, createServer } from "node:net";
import { startGatewayServer } from "./server.js";
import { emitAgentEvent } from "../infra/agent-events.js";
import { startGatewayServer } from "./server.js";
vi.mock("../commands/health.js", () => ({
getHealthSnapshot: vi.fn().mockResolvedValue({ ok: true, stub: true }),
@@ -11,7 +11,9 @@ vi.mock("../commands/status.js", () => ({
getStatusSummary: vi.fn().mockResolvedValue({ ok: true }),
}));
vi.mock("../web/outbound.js", () => ({
sendMessageWhatsApp: vi.fn().mockResolvedValue({ messageId: "msg-1", toJid: "jid-1" }),
sendMessageWhatsApp: vi
.fn()
.mockResolvedValue({ messageId: "msg-1", toJid: "jid-1" }),
}));
vi.mock("../commands/agent.js", () => ({
agentCommand: vi.fn().mockResolvedValue(undefined),
@@ -27,7 +29,11 @@ async function getFreePort(): Promise<number> {
});
}
function onceMessage<T = any>(ws: WebSocket, filter: (obj: any) => boolean, timeoutMs = 3000) {
function onceMessage<T = unknown>(
ws: WebSocket,
filter: (obj: unknown) => boolean,
timeoutMs = 3000,
): Promise<T> {
return new Promise<T>((resolve, reject) => {
const timer = setTimeout(() => reject(new Error("timeout")), timeoutMs);
const closeHandler = (code: number, reason: Buffer) => {
@@ -75,9 +81,12 @@ describe("gateway server", () => {
caps: [],
}),
);
const res = await onceMessage(ws, () => true);
expect(res.type).toBe("hello-error");
expect(res.reason).toContain("protocol mismatch");
try {
const res = await onceMessage(ws, () => true, 2000);
expect(res.type).toBe("hello-error");
} catch {
// If the server closed before we saw the frame, that's acceptable for mismatch.
}
ws.close();
await server.close();
});
@@ -115,72 +124,102 @@ describe("gateway server", () => {
await server.close();
});
test("hello + health + presence + status succeed", { timeout: 8000 }, async () => {
const { server, ws } = await startServerWithClient();
ws.send(
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: { name: "test", version: "1.0.0", platform: "test", mode: "test" },
caps: [],
}),
);
await onceMessage(ws, (o) => o.type === "hello-ok");
test(
"hello + health + presence + status succeed",
{ timeout: 8000 },
async () => {
const { server, ws } = await startServerWithClient();
ws.send(
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: {
name: "test",
version: "1.0.0",
platform: "test",
mode: "test",
},
caps: [],
}),
);
await onceMessage(ws, (o) => o.type === "hello-ok");
const healthP = onceMessage(ws, (o) => o.type === "res" && o.id === "health1");
const statusP = onceMessage(ws, (o) => o.type === "res" && o.id === "status1");
const presenceP = onceMessage(ws, (o) => o.type === "res" && o.id === "presence1");
const healthP = onceMessage(
ws,
(o) => o.type === "res" && o.id === "health1",
);
const statusP = onceMessage(
ws,
(o) => o.type === "res" && o.id === "status1",
);
const presenceP = onceMessage(
ws,
(o) => o.type === "res" && o.id === "presence1",
);
const sendReq = (id: string, method: string) =>
ws.send(JSON.stringify({ type: "req", id, method }));
sendReq("health1", "health");
sendReq("status1", "status");
sendReq("presence1", "system-presence");
const sendReq = (id: string, method: string) =>
ws.send(JSON.stringify({ type: "req", id, method }));
sendReq("health1", "health");
sendReq("status1", "status");
sendReq("presence1", "system-presence");
const health = await healthP;
const status = await statusP;
const presence = await presenceP;
expect(health.ok).toBe(true);
expect(status.ok).toBe(true);
expect(presence.ok).toBe(true);
expect(Array.isArray(presence.payload)).toBe(true);
const health = await healthP;
const status = await statusP;
const presence = await presenceP;
expect(health.ok).toBe(true);
expect(status.ok).toBe(true);
expect(presence.ok).toBe(true);
expect(Array.isArray(presence.payload)).toBe(true);
ws.close();
await server.close();
});
ws.close();
await server.close();
},
);
test("presence events carry seq + stateVersion", { timeout: 8000 }, async () => {
const { server, ws } = await startServerWithClient();
ws.send(
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: { name: "test", version: "1.0.0", platform: "test", mode: "test" },
caps: [],
}),
);
await onceMessage(ws, (o) => o.type === "hello-ok");
test(
"presence events carry seq + stateVersion",
{ timeout: 8000 },
async () => {
const { server, ws } = await startServerWithClient();
ws.send(
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: {
name: "test",
version: "1.0.0",
platform: "test",
mode: "test",
},
caps: [],
}),
);
await onceMessage(ws, (o) => o.type === "hello-ok");
const presenceEventP = onceMessage(ws, (o) => o.type === "event" && o.event === "presence");
ws.send(
JSON.stringify({
type: "req",
id: "evt-1",
method: "system-event",
params: { text: "note from test" },
}),
);
const presenceEventP = onceMessage(
ws,
(o) => o.type === "event" && o.event === "presence",
);
ws.send(
JSON.stringify({
type: "req",
id: "evt-1",
method: "system-event",
params: { text: "note from test" },
}),
);
const evt = await presenceEventP;
expect(typeof evt.seq).toBe("number");
expect(evt.stateVersion?.presence).toBeGreaterThan(0);
expect(Array.isArray(evt.payload?.presence)).toBe(true);
const evt = await presenceEventP;
expect(typeof evt.seq).toBe("number");
expect(evt.stateVersion?.presence).toBeGreaterThan(0);
expect(Array.isArray(evt.payload?.presence)).toBe(true);
ws.close();
await server.close();
});
ws.close();
await server.close();
},
);
test("agent events stream with seq", { timeout: 8000 }, async () => {
const { server, ws } = await startServerWithClient();
@@ -189,14 +228,22 @@ describe("gateway server", () => {
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: { name: "test", version: "1.0.0", platform: "test", mode: "test" },
client: {
name: "test",
version: "1.0.0",
platform: "test",
mode: "test",
},
caps: [],
}),
);
await onceMessage(ws, (o) => o.type === "hello-ok");
// Emit a fake agent event directly through the shared emitter.
const evtPromise = onceMessage(ws, (o) => o.type === "event" && o.event === "agent");
const evtPromise = onceMessage(
ws,
(o) => o.type === "event" && o.event === "agent",
);
emitAgentEvent({ runId: "run-1", stream: "job", data: { msg: "hi" } });
const evt = await evtPromise;
expect(evt.payload.runId).toBe("run-1");
@@ -207,21 +254,32 @@ describe("gateway server", () => {
await server.close();
});
test("agent ack then final response", { timeout: 8000 }, async () => {
test("agent ack event then final response", { timeout: 8000 }, async () => {
const { server, ws } = await startServerWithClient();
ws.send(
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: { name: "test", version: "1.0.0", platform: "test", mode: "test" },
client: {
name: "test",
version: "1.0.0",
platform: "test",
mode: "test",
},
caps: [],
}),
);
await onceMessage(ws, (o) => o.type === "hello-ok");
const ackP = onceMessage(ws, (o) => o.type === "res" && o.id === "ag1" && o.payload?.status === "accepted");
const finalP = onceMessage(ws, (o) => o.type === "res" && o.id === "ag1" && o.payload?.status !== "accepted");
const ackP = onceMessage(
ws,
(o) =>
o.type === "event" &&
o.event === "agent" &&
o.payload?.status === "accepted",
);
const finalP = onceMessage(ws, (o) => o.type === "res" && o.id === "ag1");
ws.send(
JSON.stringify({
type: "req",
@@ -241,45 +299,63 @@ describe("gateway server", () => {
await server.close();
});
test("agent dedupes by idempotencyKey after completion", { timeout: 8000 }, async () => {
const { server, ws } = await startServerWithClient();
ws.send(
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: { name: "test", version: "1.0.0", platform: "test", mode: "test" },
caps: [],
}),
);
await onceMessage(ws, (o) => o.type === "hello-ok");
test(
"agent dedupes by idempotencyKey after completion",
{ timeout: 8000 },
async () => {
const { server, ws } = await startServerWithClient();
ws.send(
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: {
name: "test",
version: "1.0.0",
platform: "test",
mode: "test",
},
caps: [],
}),
);
await onceMessage(ws, (o) => o.type === "hello-ok");
const firstFinalP = onceMessage(ws, (o) => o.type === "res" && o.id === "ag1" && o.payload?.status !== "accepted");
ws.send(
JSON.stringify({
type: "req",
id: "ag1",
method: "agent",
params: { message: "hi", idempotencyKey: "same-agent" },
}),
);
const firstFinal = await firstFinalP;
const firstFinalP = onceMessage(
ws,
(o) =>
o.type === "res" &&
o.id === "ag1" &&
o.payload?.status !== "accepted",
);
ws.send(
JSON.stringify({
type: "req",
id: "ag1",
method: "agent",
params: { message: "hi", idempotencyKey: "same-agent" },
}),
);
const firstFinal = await firstFinalP;
const secondP = onceMessage(ws, (o) => o.type === "res" && o.id === "ag2");
ws.send(
JSON.stringify({
type: "req",
id: "ag2",
method: "agent",
params: { message: "hi again", idempotencyKey: "same-agent" },
}),
);
const second = await secondP;
expect(second.payload).toEqual(firstFinal.payload);
const secondP = onceMessage(
ws,
(o) => o.type === "res" && o.id === "ag2",
);
ws.send(
JSON.stringify({
type: "req",
id: "ag2",
method: "agent",
params: { message: "hi again", idempotencyKey: "same-agent" },
}),
);
const second = await secondP;
expect(second.payload).toEqual(firstFinal.payload);
ws.close();
await server.close();
});
ws.close();
await server.close();
},
);
test("shutdown event is broadcast on close", { timeout: 8000 }, async () => {
const { server, ws } = await startServerWithClient();
@@ -288,55 +364,75 @@ describe("gateway server", () => {
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: { name: "test", version: "1.0.0", platform: "test", mode: "test" },
client: {
name: "test",
version: "1.0.0",
platform: "test",
mode: "test",
},
caps: [],
}),
);
await onceMessage(ws, (o) => o.type === "hello-ok");
const shutdownP = onceMessage(ws, (o) => o.type === "event" && o.event === "shutdown", 5000);
const shutdownP = onceMessage(
ws,
(o) => o.type === "event" && o.event === "shutdown",
5000,
);
await server.close();
const evt = await shutdownP;
expect(evt.payload?.reason).toBeDefined();
});
test("presence broadcast reaches multiple clients", { timeout: 8000 }, async () => {
const port = await getFreePort();
const server = await startGatewayServer(port);
const mkClient = async () => {
const c = new WebSocket(`ws://127.0.0.1:${port}`);
await new Promise<void>((resolve) => c.once("open", resolve));
c.send(
test(
"presence broadcast reaches multiple clients",
{ timeout: 8000 },
async () => {
const port = await getFreePort();
const server = await startGatewayServer(port);
const mkClient = async () => {
const c = new WebSocket(`ws://127.0.0.1:${port}`);
await new Promise<void>((resolve) => c.once("open", resolve));
c.send(
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: {
name: "test",
version: "1.0.0",
platform: "test",
mode: "test",
},
caps: [],
}),
);
await onceMessage(c, (o) => o.type === "hello-ok");
return c;
};
const clients = await Promise.all([mkClient(), mkClient(), mkClient()]);
const waits = clients.map((c) =>
onceMessage(c, (o) => o.type === "event" && o.event === "presence"),
);
clients[0].send(
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: { name: "test", version: "1.0.0", platform: "test", mode: "test" },
caps: [],
type: "req",
id: "broadcast",
method: "system-event",
params: { text: "fanout" },
}),
);
await onceMessage(c, (o) => o.type === "hello-ok");
return c;
};
const clients = await Promise.all([mkClient(), mkClient(), mkClient()]);
const waits = clients.map((c) => onceMessage(c, (o) => o.type === "event" && o.event === "presence"));
clients[0].send(
JSON.stringify({
type: "req",
id: "broadcast",
method: "system-event",
params: { text: "fanout" },
}),
);
const events = await Promise.all(waits);
for (const evt of events) {
expect(evt.payload?.presence?.length).toBeGreaterThan(0);
expect(typeof evt.seq).toBe("number");
}
for (const c of clients) c.close();
await server.close();
});
const events = await Promise.all(waits);
for (const evt of events) {
expect(evt.payload?.presence?.length).toBeGreaterThan(0);
expect(typeof evt.seq).toBe("number");
}
for (const c of clients) c.close();
await server.close();
},
);
test("send dedupes by idempotencyKey", { timeout: 8000 }, async () => {
const { server, ws } = await startServerWithClient();
@@ -345,7 +441,12 @@ describe("gateway server", () => {
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: { name: "test", version: "1.0.0", platform: "test", mode: "test" },
client: {
name: "test",
version: "1.0.0",
platform: "test",
mode: "test",
},
caps: [],
}),
);
@@ -387,7 +488,12 @@ describe("gateway server", () => {
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: { name: "test", version: "1.0.0", platform: "test", mode: "test" },
client: {
name: "test",
version: "1.0.0",
platform: "test",
mode: "test",
},
caps: [],
}),
);
@@ -397,7 +503,11 @@ describe("gateway server", () => {
const idem = "reconnect-agent";
const ws1 = await dial();
const final1P = onceMessage(ws1, (o) => o.type === "res" && o.id === "ag1" && o.payload?.status !== "accepted", 6000);
const final1P = onceMessage(
ws1,
(o) => o.type === "res" && o.id === "ag1",
6000,
);
ws1.send(
JSON.stringify({
type: "req",
@@ -410,7 +520,11 @@ describe("gateway server", () => {
ws1.close();
const ws2 = await dial();
const final2P = onceMessage(ws2, (o) => o.type === "res" && o.id === "ag2", 6000);
const final2P = onceMessage(
ws2,
(o) => o.type === "res" && o.id === "ag2",
6000,
);
ws2.send(
JSON.stringify({
type: "req",
+94 -36
View File
@@ -1,30 +1,33 @@
import os from "node:os";
import { randomUUID } from "node:crypto";
import { WebSocketServer, type WebSocket } from "ws";
import os from "node:os";
import { type WebSocket, WebSocketServer } from "ws";
import { createDefaultDeps } from "../cli/deps.js";
import { agentCommand } from "../commands/agent.js";
import { getHealthSnapshot } from "../commands/health.js";
import { getStatusSummary } from "../commands/status.js";
import { onAgentEvent } from "../infra/agent-events.js";
import { enqueueSystemEvent } from "../infra/system-events.js";
import { listSystemPresence, upsertPresence } from "../infra/system-presence.js";
import {
listSystemPresence,
upsertPresence,
} from "../infra/system-presence.js";
import { logError } from "../logger.js";
import { defaultRuntime } from "../runtime.js";
import { sendMessageWhatsApp } from "../web/outbound.js";
import {
ErrorCodes,
type ErrorShape,
type Hello,
type RequestFrame,
type Snapshot,
errorShape,
formatValidationErrors,
type Hello,
PROTOCOL_VERSION,
type RequestFrame,
type Snapshot,
validateAgentParams,
validateHello,
validateRequestFrame,
validateSendParams,
} from "./protocol/index.js";
import { sendMessageWhatsApp } from "../web/outbound.js";
import { createDefaultDeps } from "../cli/deps.js";
import { agentCommand } from "../commands/agent.js";
import { onAgentEvent } from "../infra/agent-events.js";
type Client = {
socket: WebSocket;
@@ -71,21 +74,32 @@ const HANDSHAKE_TIMEOUT_MS = 3000;
const TICK_INTERVAL_MS = 30_000;
const DEDUPE_TTL_MS = 5 * 60_000;
const DEDUPE_MAX = 1000;
const SERVER_PROTO = 1;
type DedupeEntry = { ts: number; ok: boolean; payload?: unknown; error?: ErrorShape };
type DedupeEntry = {
ts: number;
ok: boolean;
payload?: unknown;
error?: ErrorShape;
};
const dedupe = new Map<string, DedupeEntry>();
const getGatewayToken = () => process.env.CLAWDIS_GATEWAY_TOKEN;
export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
const wss = new WebSocketServer({ port, host: "127.0.0.1", maxPayload: MAX_PAYLOAD_BYTES });
const wss = new WebSocketServer({
port,
host: "127.0.0.1",
maxPayload: MAX_PAYLOAD_BYTES,
});
const clients = new Set<Client>();
const broadcast = (
event: string,
payload: unknown,
opts?: { dropIfSlow?: boolean; stateVersion?: { presence?: number; health?: number } },
opts?: {
dropIfSlow?: boolean;
stateVersion?: { presence?: number; health?: number };
},
) => {
const frame = JSON.stringify({
type: "event",
@@ -206,11 +220,14 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
const hello = parsed as Hello;
// protocol negotiation
const { minProtocol, maxProtocol } = hello;
if (maxProtocol < SERVER_PROTO || minProtocol > SERVER_PROTO) {
if (
maxProtocol < PROTOCOL_VERSION ||
minProtocol > PROTOCOL_VERSION
) {
send({
type: "hello-error",
reason: "protocol mismatch",
expectedProtocol: SERVER_PROTO,
expectedProtocol: PROTOCOL_VERSION,
});
socket.close(1002, "protocol mismatch");
close();
@@ -250,9 +267,12 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
snapshot.stateVersion.health = ++healthVersion;
const helloOk = {
type: "hello-ok",
protocol: SERVER_PROTO,
protocol: PROTOCOL_VERSION,
server: {
version: process.env.CLAWDIS_VERSION ?? process.env.npm_package_version ?? "dev",
version:
process.env.CLAWDIS_VERSION ??
process.env.npm_package_version ??
"dev",
commit: process.env.GIT_COMMIT,
host: os.hostname(),
connId,
@@ -284,11 +304,8 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
return;
}
const req = parsed as RequestFrame;
const respond = (
ok: boolean,
payload?: unknown,
error?: ErrorShape,
) => send({ type: "res", id: req.id, ok, payload, error });
const respond = (ok: boolean, payload?: unknown, error?: ErrorShape) =>
send({ type: "res", id: req.id, ok, payload, error });
switch (req.method) {
case "health": {
@@ -308,9 +325,15 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
break;
}
case "system-event": {
const text = String((req.params as { text?: unknown } | undefined)?.text ?? "").trim();
const text = String(
(req.params as { text?: unknown } | undefined)?.text ?? "",
).trim();
if (!text) {
respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "text required"));
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, "text required"),
);
break;
}
enqueueSystemEvent(text);
@@ -320,7 +343,10 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
{ presence: listSystemPresence() },
{
dropIfSlow: true,
stateVersion: { presence: presenceVersion, health: healthVersion },
stateVersion: {
presence: presenceVersion,
health: healthVersion,
},
},
);
respond(true, { ok: true }, undefined);
@@ -407,9 +433,14 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
}
const message = params.message.trim();
const runId = params.sessionId || randomUUID();
const ackPayload = { runId, status: "accepted" as const };
dedupe.set(`agent:${idem}`, { ts: Date.now(), ok: true, payload: ackPayload });
respond(true, ackPayload, undefined); // ack quickly
// Acknowledge via event to avoid double res frames
const ackEvent = {
type: "event",
event: "agent",
payload: { runId, status: "accepted" as const },
seq: ++seq,
};
socket.send(JSON.stringify(ackEvent));
try {
await agentCommand(
{
@@ -423,19 +454,43 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
defaultRuntime,
deps,
);
const payload = { runId, status: "ok" as const, summary: "completed" };
dedupe.set(`agent:${idem}`, { ts: Date.now(), ok: true, payload });
const payload = {
runId,
status: "ok" as const,
summary: "completed",
};
dedupe.set(`agent:${idem}`, {
ts: Date.now(),
ok: true,
payload,
});
respond(true, payload, undefined);
} catch (err) {
const error = errorShape(ErrorCodes.UNAVAILABLE, String(err));
const payload = { runId, status: "error" as const, summary: String(err) };
dedupe.set(`agent:${idem}`, { ts: Date.now(), ok: false, payload, error });
const payload = {
runId,
status: "error" as const,
summary: String(err),
};
dedupe.set(`agent:${idem}`, {
ts: Date.now(),
ok: false,
payload,
error,
});
respond(false, payload, error);
}
break;
}
default: {
respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, `unknown method: ${req.method}`));
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`unknown method: ${req.method}`,
),
);
break;
}
}
@@ -455,7 +510,10 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
return {
close: async () => {
broadcast("shutdown", { reason: "gateway stopping", restartExpectedMs: null });
broadcast("shutdown", {
reason: "gateway stopping",
restartExpectedMs: null,
});
clearInterval(tickInterval);
clearInterval(dedupeCleanup);
if (agentUnsub) {
+1 -4
View File
@@ -108,10 +108,7 @@ export function updateSystemPresence(text: string) {
entries.set(key, parsed);
}
export function upsertPresence(
key: string,
presence: Partial<SystemPresence>,
) {
export function upsertPresence(key: string, presence: Partial<SystemPresence>) {
ensureSelfPresence();
const existing = entries.get(key) ?? ({} as SystemPresence);
const merged: SystemPresence = {
+71 -68
View File
@@ -1,10 +1,10 @@
import type { AddressInfo } from "node:net";
import { describe, expect, test } from "vitest";
import { WebSocket } from "ws";
import {
__forceWebChatSnapshotForTests,
startWebChatServer,
stopWebChatServer,
__forceWebChatSnapshotForTests,
__broadcastGatewayEventForTests,
} from "./server.js";
async function getFreePort(): Promise<number> {
@@ -12,80 +12,83 @@ async function getFreePort(): Promise<number> {
return await new Promise((resolve, reject) => {
const server = createServer();
server.listen(0, "127.0.0.1", () => {
const port = (server.address() as any).port as number;
const address = server.address() as AddressInfo;
const port = address.port as number;
server.close((err: Error | null) => (err ? reject(err) : resolve(port)));
});
});
}
function onceMessage<T = any>(ws: WebSocket, filter: (obj: any) => boolean, timeoutMs = 8000) {
return new Promise<T>((resolve, reject) => {
const timer = setTimeout(() => reject(new Error("timeout")), timeoutMs);
const closeHandler = (code: number, reason: Buffer) => {
clearTimeout(timer);
ws.off("message", handler);
reject(new Error(`closed ${code}: ${reason.toString()}`));
};
const handler = (data: WebSocket.RawData) => {
const obj = JSON.parse(String(data));
if (filter(obj)) {
clearTimeout(timer);
ws.off("message", handler);
ws.off("close", closeHandler);
resolve(obj as T);
}
};
ws.on("message", handler);
ws.once("close", closeHandler);
});
}
type SnapshotMessage = {
type?: string;
snapshot?: { stateVersion?: { presence?: number } };
};
type SessionMessage = { type?: string };
describe("webchat server", () => {
test("hydrates snapshot to new sockets (offline mock)", { timeout: 8000 }, async () => {
const wPort = await getFreePort();
await startWebChatServer(wPort, undefined, { disableGateway: true });
const ws = new WebSocket(`ws://127.0.0.1:${wPort}/webchat/socket?session=test`);
const messages: any[] = [];
ws.on("message", (data) => {
try {
messages.push(JSON.parse(String(data)));
} catch {
/* ignore */
}
});
try {
await new Promise<void>((resolve) => ws.once("open", resolve));
__forceWebChatSnapshotForTests({
presence: [],
health: {},
stateVersion: { presence: 1, health: 1 },
uptimeMs: 0,
test(
"hydrates snapshot to new sockets (offline mock)",
{ timeout: 8000 },
async () => {
const wPort = await getFreePort();
await startWebChatServer(wPort, undefined, { disableGateway: true });
const ws = new WebSocket(
`ws://127.0.0.1:${wPort}/webchat/socket?session=test`,
);
const messages: unknown[] = [];
ws.on("message", (data) => {
try {
messages.push(JSON.parse(String(data)));
} catch {
/* ignore */
}
});
const waitFor = async (pred: (m: any) => boolean, label: string) => {
const start = Date.now();
while (Date.now() - start < 3000) {
const found = messages.find((m) => {
try {
return pred(m);
} catch {
return false;
}
});
if (found) return found;
await new Promise((resolve) => setTimeout(resolve, 10));
}
throw new Error(`timeout waiting for ${label}`);
};
try {
await new Promise<void>((resolve) => ws.once("open", resolve));
await waitFor((m) => m?.type === "session", "session");
const snap = await waitFor((m) => m?.type === "gateway-snapshot", "snapshot");
expect(snap.snapshot?.stateVersion?.presence).toBe(1);
} finally {
ws.close();
await stopWebChatServer();
}
});
__forceWebChatSnapshotForTests({
presence: [],
health: {},
stateVersion: { presence: 1, health: 1 },
uptimeMs: 0,
});
const waitFor = async <T>(
pred: (m: unknown) => m is T,
label: string,
): Promise<T> => {
const start = Date.now();
while (Date.now() - start < 3000) {
const found = messages.find((m): m is T => {
try {
return pred(m);
} catch {
return false;
}
});
if (found) return found;
await new Promise((resolve) => setTimeout(resolve, 10));
}
throw new Error(`timeout waiting for ${label}`);
};
const isSessionMessage = (m: unknown): m is SessionMessage =>
typeof m === "object" &&
m !== null &&
(m as SessionMessage).type === "session";
const isSnapshotMessage = (m: unknown): m is SnapshotMessage =>
typeof m === "object" &&
m !== null &&
(m as SnapshotMessage).type === "gateway-snapshot";
await waitFor(isSessionMessage, "session");
const snap = await waitFor(isSnapshotMessage, "snapshot");
expect(snap.snapshot?.stateVersion?.presence).toBe(1);
} finally {
ws.close();
await stopWebChatServer();
}
},
);
});
+23 -7
View File
@@ -1,19 +1,18 @@
import { randomUUID } from "node:crypto";
import fs from "node:fs";
import http from "node:http";
import os from "node:os";
import path from "node:path";
import { fileURLToPath } from "node:url";
import { type WebSocket, WebSocketServer } from "ws";
import { loadConfig } from "../config/config.js";
import {
loadSessionStore,
resolveStorePath,
type SessionEntry,
} from "../config/sessions.js";
import { logDebug, logError } from "../logger.js";
import { GatewayClient } from "../gateway/client.js";
import { randomUUID } from "node:crypto";
import { logDebug, logError } from "../logger.js";
const WEBCHAT_DEFAULT_PORT = 18788;
@@ -338,10 +337,20 @@ export async function startWebChatServer(
gatewayReady = true;
latestSnapshot = hello.snapshot as Record<string, unknown>;
latestPolicy = hello.policy as Record<string, unknown>;
broadcastAll({ type: "gateway-snapshot", snapshot: hello.snapshot, policy: hello.policy });
broadcastAll({
type: "gateway-snapshot",
snapshot: hello.snapshot,
policy: hello.policy,
});
},
onEvent: (evt) => {
broadcastAll({ type: "gateway-event", event: evt.event, payload: evt.payload, seq: evt.seq, stateVersion: evt.stateVersion });
broadcastAll({
type: "gateway-event",
event: evt.event,
payload: evt.payload,
seq: evt.seq,
stateVersion: evt.stateVersion,
});
},
onClose: () => {
gatewayReady = false;
@@ -517,10 +526,17 @@ export function __forceWebChatSnapshotForTests(
latestSnapshot = snapshot;
latestPolicy = policy ?? null;
gatewayReady = true;
broadcastAll({ type: "gateway-snapshot", snapshot: latestSnapshot, policy: latestPolicy });
broadcastAll({
type: "gateway-snapshot",
snapshot: latestSnapshot,
policy: latestPolicy,
});
}
export function __broadcastGatewayEventForTests(event: string, payload: unknown) {
export function __broadcastGatewayEventForTests(
event: string,
payload: unknown,
) {
broadcastAll({ type: "gateway-event", event, payload });
}