feat(bridge): add Bonjour node bridge

This commit is contained in:
Peter Steinberger
2025-12-12 21:18:46 +00:00
parent b9007dc721
commit 0b532579d8
17 changed files with 1002 additions and 13 deletions
@@ -0,0 +1,273 @@
import ClawdisNodeKit
import Foundation
import Network
import OSLog
actor BridgeConnectionHandler {
private let connection: NWConnection
private let logger: Logger
private let decoder = JSONDecoder()
private let encoder = JSONEncoder()
private let queue = DispatchQueue(label: "com.steipete.clawdis.bridge.connection")
private var buffer = Data()
private var isAuthenticated = false
private var nodeId: String?
private var pendingInvokes: [String: CheckedContinuation<BridgeInvokeResponse, Error>] = [:]
private var isClosed = false
init(connection: NWConnection, logger: Logger) {
self.connection = connection
self.logger = logger
}
enum AuthResult: Sendable {
case ok
case notPaired
case unauthorized
case error(code: String, message: String)
}
enum PairResult: Sendable {
case ok(token: String)
case rejected
case error(code: String, message: String)
}
func run(
resolveAuth: @escaping @Sendable (BridgeHello) async -> AuthResult,
handlePair: @escaping @Sendable (BridgePairRequest) async -> PairResult,
onAuthenticated: (@Sendable (String) async -> Void)? = nil,
onDisconnected: (@Sendable (String) async -> Void)? = nil,
onEvent: (@Sendable (String, BridgeEventFrame) async -> Void)? = nil) async
{
self.connection.stateUpdateHandler = { [logger] state in
switch state {
case .ready:
logger.debug("bridge conn ready")
case let .failed(err):
logger.error("bridge conn failed: \(err.localizedDescription, privacy: .public)")
default:
break
}
}
self.connection.start(queue: self.queue)
while true {
do {
guard let line = try await self.receiveLine() else { break }
guard let data = line.data(using: .utf8) else { continue }
let base = try self.decoder.decode(BridgeBaseFrame.self, from: data)
switch base.type {
case "hello":
let hello = try self.decoder.decode(BridgeHello.self, from: data)
self.nodeId = hello.nodeId
let result = await resolveAuth(hello)
await self.handleAuthResult(
result,
serverName: Host.current().localizedName ?? ProcessInfo.processInfo.hostName)
if case .ok = result, let nodeId = self.nodeId {
await onAuthenticated?(nodeId)
}
case "pair-request":
let req = try self.decoder.decode(BridgePairRequest.self, from: data)
self.nodeId = req.nodeId
let result = await handlePair(req)
await self.handlePairResult(
result,
serverName: Host.current().localizedName ?? ProcessInfo.processInfo.hostName)
if case .ok = result, let nodeId = self.nodeId {
await onAuthenticated?(nodeId)
}
case "event":
guard self.isAuthenticated, let nodeId = self.nodeId else {
await self.sendError(code: "UNAUTHORIZED", message: "not authenticated")
continue
}
let evt = try self.decoder.decode(BridgeEventFrame.self, from: data)
await onEvent?(nodeId, evt)
case "ping":
if !self.isAuthenticated {
await self.sendError(code: "UNAUTHORIZED", message: "not authenticated")
continue
}
let ping = try self.decoder.decode(BridgePing.self, from: data)
try await self.send(BridgePong(type: "pong", id: ping.id))
case "invoke-res":
guard self.isAuthenticated else {
await self.sendError(code: "UNAUTHORIZED", message: "not authenticated")
continue
}
let res = try self.decoder.decode(BridgeInvokeResponse.self, from: data)
if let cont = self.pendingInvokes.removeValue(forKey: res.id) {
cont.resume(returning: res)
}
default:
await self.sendError(code: "INVALID_REQUEST", message: "unknown type")
}
} catch {
await self.sendError(code: "INVALID_REQUEST", message: error.localizedDescription)
}
}
await self.close(with: onDisconnected)
}
private func handlePairResult(_ result: PairResult, serverName: String) async {
switch result {
case let .ok(token):
do {
try await self.send(BridgePairOk(type: "pair-ok", token: token))
self.isAuthenticated = true
try await self.send(BridgeHelloOk(type: "hello-ok", serverName: serverName))
} catch {
self.logger.error("bridge send pair-ok failed: \(error.localizedDescription, privacy: .public)")
}
case .rejected:
await self.sendError(code: "UNAUTHORIZED", message: "pairing rejected")
case let .error(code, message):
await self.sendError(code: code, message: message)
}
}
private func handleAuthResult(_ result: AuthResult, serverName: String) async {
switch result {
case .ok:
self.isAuthenticated = true
do {
try await self.send(BridgeHelloOk(type: "hello-ok", serverName: serverName))
} catch {
self.logger.error("bridge send hello-ok failed: \(error.localizedDescription, privacy: .public)")
}
case .notPaired:
await self.sendError(code: "NOT_PAIRED", message: "pairing required")
case .unauthorized:
await self.sendError(code: "UNAUTHORIZED", message: "invalid token")
case let .error(code, message):
await self.sendError(code: code, message: message)
}
}
private func sendError(code: String, message: String) async {
do {
try await self.send(BridgeErrorFrame(type: "error", code: code, message: message))
} catch {
self.logger.error("bridge send error failed: \(error.localizedDescription, privacy: .public)")
}
}
func invoke(command: String, paramsJSON: String?) async throws -> BridgeInvokeResponse {
guard self.isAuthenticated else {
throw NSError(domain: "Bridge", code: 1, userInfo: [
NSLocalizedDescriptionKey: "UNAUTHORIZED: not authenticated",
])
}
let id = UUID().uuidString
let req = BridgeInvokeRequest(type: "invoke", id: id, command: command, paramsJSON: paramsJSON)
let timeoutTask = Task {
try await Task.sleep(nanoseconds: 15 * 1_000_000_000)
await self.timeoutInvoke(id: id)
}
defer { timeoutTask.cancel() }
return try await withCheckedThrowingContinuation { cont in
Task { [weak self] in
guard let self else { return }
await self.beginInvoke(id: id, request: req, continuation: cont)
}
}
}
private func beginInvoke(
id: String,
request: BridgeInvokeRequest,
continuation: CheckedContinuation<BridgeInvokeResponse, Error>) async
{
self.pendingInvokes[id] = continuation
do {
try await self.send(request)
} catch {
await self.failInvoke(id: id, error: error)
}
}
private func timeoutInvoke(id: String) async {
guard let cont = self.pendingInvokes.removeValue(forKey: id) else { return }
cont.resume(throwing: NSError(domain: "Bridge", code: 3, userInfo: [
NSLocalizedDescriptionKey: "UNAVAILABLE: invoke timeout",
]))
}
private func failInvoke(id: String, error: Error) async {
guard let cont = self.pendingInvokes.removeValue(forKey: id) else { return }
cont.resume(throwing: error)
}
private func send(_ obj: some Encodable) async throws {
let data = try self.encoder.encode(obj)
var line = Data()
line.append(data)
line.append(0x0A) // \n
let _: Void = try await withCheckedThrowingContinuation { (cont: CheckedContinuation<Void, Error>) in
self.connection.send(content: line, completion: .contentProcessed { err in
if let err {
cont.resume(throwing: err)
} else {
cont.resume(returning: ())
}
})
}
}
private func receiveLine() async throws -> String? {
while true {
if let idx = self.buffer.firstIndex(of: 0x0A) {
let lineData = self.buffer.prefix(upTo: idx)
self.buffer.removeSubrange(...idx)
return String(data: lineData, encoding: .utf8)
}
let chunk = try await self.receiveChunk()
if chunk.isEmpty { return nil }
self.buffer.append(chunk)
}
}
private func receiveChunk() async throws -> Data {
try await withCheckedThrowingContinuation { cont in
self.connection
.receive(minimumIncompleteLength: 1, maximumLength: 64 * 1024) { data, _, isComplete, error in
if let error {
cont.resume(throwing: error)
return
}
if isComplete {
cont.resume(returning: Data())
return
}
cont.resume(returning: data ?? Data())
}
}
}
private func close(with onDisconnected: (@Sendable (String) async -> Void)? = nil) async {
if self.isClosed { return }
self.isClosed = true
let nodeId = self.nodeId
let pending = self.pendingInvokes.values
self.pendingInvokes.removeAll()
for cont in pending {
cont.resume(throwing: NSError(domain: "Bridge", code: 4, userInfo: [
NSLocalizedDescriptionKey: "UNAVAILABLE: connection closed",
]))
}
self.connection.cancel()
if let nodeId {
await onDisconnected?(nodeId)
}
}
}
@@ -0,0 +1,259 @@
import AppKit
import ClawdisNodeKit
import Foundation
import Network
import OSLog
actor BridgeServer {
static let shared = BridgeServer()
private let logger = Logger(subsystem: "com.steipete.clawdis", category: "bridge")
private var listener: NWListener?
private var isRunning = false
private var store: PairedNodesStore?
private var connections: [String: BridgeConnectionHandler] = [:]
func start() async {
if self.isRunning { return }
self.isRunning = true
do {
let storeURL = try Self.defaultStoreURL()
let store = PairedNodesStore(fileURL: storeURL)
await store.load()
self.store = store
let params = NWParameters.tcp
let listener = try NWListener(using: params, on: .any)
let name = Host.current().localizedName ?? ProcessInfo.processInfo.hostName
listener.service = NWListener.Service(
name: "\(name) (Clawdis)",
type: ClawdisBonjour.bridgeServiceType,
domain: ClawdisBonjour.bridgeServiceDomain,
txtRecord: nil)
listener.newConnectionHandler = { [weak self] connection in
guard let self else { return }
Task { await self.handle(connection: connection) }
}
listener.stateUpdateHandler = { [weak self] state in
guard let self else { return }
Task { await self.handleListenerState(state) }
}
listener.start(queue: DispatchQueue(label: "com.steipete.clawdis.bridge"))
self.listener = listener
} catch {
self.logger.error("bridge start failed: \(error.localizedDescription, privacy: .public)")
self.isRunning = false
}
}
func stop() async {
self.isRunning = false
self.listener?.cancel()
self.listener = nil
}
private func handleListenerState(_ state: NWListener.State) {
switch state {
case .ready:
self.logger.info("bridge listening")
case let .failed(err):
self.logger.error("bridge listener failed: \(err.localizedDescription, privacy: .public)")
case .cancelled:
self.logger.info("bridge listener cancelled")
case .waiting:
self.logger.info("bridge listener waiting")
case .setup:
break
@unknown default:
break
}
}
private func handle(connection: NWConnection) async {
let handler = BridgeConnectionHandler(connection: connection, logger: self.logger)
await handler.run(
resolveAuth: { [weak self] hello in
await self?.authorize(hello: hello) ?? .error(code: "UNAVAILABLE", message: "bridge unavailable")
},
handlePair: { [weak self] request in
await self?.pair(request: request) ?? .error(code: "UNAVAILABLE", message: "bridge unavailable")
},
onAuthenticated: { [weak self] nodeId in
await self?.registerConnection(handler: handler, nodeId: nodeId)
},
onDisconnected: { [weak self] nodeId in
await self?.unregisterConnection(nodeId: nodeId)
},
onEvent: { [weak self] nodeId, evt in
await self?.handleEvent(nodeId: nodeId, evt: evt)
})
}
func invoke(nodeId: String, command: String, paramsJSON: String?) async throws -> BridgeInvokeResponse {
guard let handler = self.connections[nodeId] else {
throw NSError(domain: "Bridge", code: 10, userInfo: [
NSLocalizedDescriptionKey: "UNAVAILABLE: node not connected",
])
}
return try await handler.invoke(command: command, paramsJSON: paramsJSON)
}
func connectedNodeIds() -> [String] {
Array(self.connections.keys).sorted()
}
private func registerConnection(handler: BridgeConnectionHandler, nodeId: String) async {
self.connections[nodeId] = handler
await self.beacon(text: "Node connected", nodeId: nodeId, tags: ["node", "ios"])
}
private func unregisterConnection(nodeId: String) async {
self.connections.removeValue(forKey: nodeId)
await self.beacon(text: "Node disconnected", nodeId: nodeId, tags: ["node", "ios"])
}
private struct VoiceTranscriptPayload: Codable, Sendable {
var text: String
var sessionKey: String?
}
private func handleEvent(nodeId: String, evt: BridgeEventFrame) async {
switch evt.event {
case "voice.transcript":
guard let json = evt.payloadJSON, let data = json.data(using: .utf8) else {
return
}
guard let payload = try? JSONDecoder().decode(VoiceTranscriptPayload.self, from: data) else {
return
}
let text = payload.text.trimmingCharacters(in: .whitespacesAndNewlines)
guard !text.isEmpty else { return }
let sessionKey = payload.sessionKey?.trimmingCharacters(in: .whitespacesAndNewlines).nonEmpty
?? "node-\(nodeId)"
_ = await AgentRPC.shared.send(
text: text,
thinking: "low",
sessionKey: sessionKey,
deliver: false,
to: nil,
channel: "last")
default:
break
}
}
private func beacon(text: String, nodeId: String, tags: [String]) async {
do {
let params: [String: Any] = [
"text": "\(text): \(nodeId)",
"instanceId": nodeId,
"mode": "node",
"tags": tags,
]
_ = try await AgentRPC.shared.controlRequest(
method: "system-event",
params: ControlRequestParams(raw: params))
} catch {
// Best-effort only.
}
}
private func authorize(hello: BridgeHello) async -> BridgeConnectionHandler.AuthResult {
let nodeId = hello.nodeId.trimmingCharacters(in: .whitespacesAndNewlines)
if nodeId.isEmpty {
return .error(code: "INVALID_REQUEST", message: "nodeId required")
}
guard let store = self.store else {
return .error(code: "UNAVAILABLE", message: "store unavailable")
}
guard let paired = await store.find(nodeId: nodeId) else {
return .notPaired
}
guard let token = hello.token, token == paired.token else {
return .unauthorized
}
do { try await store.touchSeen(nodeId: nodeId) } catch { /* ignore */ }
return .ok
}
private func pair(request: BridgePairRequest) async -> BridgeConnectionHandler.PairResult {
let nodeId = request.nodeId.trimmingCharacters(in: .whitespacesAndNewlines)
if nodeId.isEmpty {
return .error(code: "INVALID_REQUEST", message: "nodeId required")
}
guard let store = self.store else {
return .error(code: "UNAVAILABLE", message: "store unavailable")
}
let existing = await store.find(nodeId: nodeId)
let approved = await BridgePairingApprover.approve(request: request, isRepair: existing != nil)
if !approved {
return .rejected
}
let token = UUID().uuidString.replacingOccurrences(of: "-", with: "")
let nowMs = Int(Date().timeIntervalSince1970 * 1000)
let node = PairedNode(
nodeId: nodeId,
displayName: request.displayName,
platform: request.platform,
version: request.version,
token: token,
createdAtMs: nowMs,
lastSeenAtMs: nowMs)
do {
try await store.upsert(node)
return .ok(token: token)
} catch {
return .error(code: "UNAVAILABLE", message: "failed to persist pairing")
}
}
private static func defaultStoreURL() throws -> URL {
let base = FileManager.default.urls(for: .applicationSupportDirectory, in: .userDomainMask).first
guard let base else {
throw NSError(
domain: "Bridge",
code: 1,
userInfo: [NSLocalizedDescriptionKey: "Application Support unavailable"])
}
return base
.appendingPathComponent("Clawdis", isDirectory: true)
.appendingPathComponent("bridge", isDirectory: true)
.appendingPathComponent("paired-nodes.json", isDirectory: false)
}
}
@MainActor
enum BridgePairingApprover {
static func approve(request: BridgePairRequest, isRepair: Bool) async -> Bool {
await withCheckedContinuation { cont in
let name = request.displayName ?? request.nodeId
let alert = NSAlert()
alert.messageText = isRepair ? "Re-pair Clawdis Node?" : "Pair Clawdis Node?"
alert.informativeText = """
Node: \(name)
Platform: \(request.platform ?? "unknown")
Version: \(request.version ?? "unknown")
"""
alert.addButton(withTitle: "Approve")
alert.addButton(withTitle: "Reject")
let resp = alert.runModal()
cont.resume(returning: resp == .alertFirstButtonReturn)
}
}
}
extension String {
fileprivate var nonEmpty: String? {
let trimmed = trimmingCharacters(in: .whitespacesAndNewlines)
return trimmed.isEmpty ? nil : trimmed
}
}
@@ -0,0 +1,57 @@
import Foundation
struct PairedNode: Codable, Equatable {
var nodeId: String
var displayName: String?
var platform: String?
var version: String?
var token: String
var createdAtMs: Int
var lastSeenAtMs: Int?
}
actor PairedNodesStore {
private let fileURL: URL
private var nodes: [String: PairedNode] = [:]
init(fileURL: URL) {
self.fileURL = fileURL
}
func load() {
do {
let data = try Data(contentsOf: self.fileURL)
let decoded = try JSONDecoder().decode([String: PairedNode].self, from: data)
self.nodes = decoded
} catch {
self.nodes = [:]
}
}
func all() -> [PairedNode] {
self.nodes.values.sorted { a, b in (a.displayName ?? a.nodeId) < (b.displayName ?? b.nodeId) }
}
func find(nodeId: String) -> PairedNode? {
self.nodes[nodeId]
}
func upsert(_ node: PairedNode) async throws {
self.nodes[node.nodeId] = node
try await self.persist()
}
func touchSeen(nodeId: String) async throws {
guard var node = self.nodes[nodeId] else { return }
node.lastSeenAtMs = Int(Date().timeIntervalSince1970 * 1000)
self.nodes[nodeId] = node
try await self.persist()
}
private func persist() async throws {
let dir = self.fileURL.deletingLastPathComponent()
try FileManager.default.createDirectory(at: dir, withIntermediateDirectories: true)
let data = try JSONEncoder().encode(self.nodes)
try data.write(to: self.fileURL, options: [.atomic])
}
}