diff --git a/packages/server/.env.example b/packages/server/.env.example index 18f8efd2..2131b8d1 100644 --- a/packages/server/.env.example +++ b/packages/server/.env.example @@ -1,3 +1,4 @@ PORT=3000 # USERNAME=user -# PASSWORD=1234 \ No newline at end of file +# PASSWORD=1234 +# EXECUTION_MODE=child or main \ No newline at end of file diff --git a/packages/server/src/ChildProcess.ts b/packages/server/src/ChildProcess.ts new file mode 100644 index 00000000..483379d0 --- /dev/null +++ b/packages/server/src/ChildProcess.ts @@ -0,0 +1,148 @@ +import { IChildProcessMessage, IReactFlowNode, IReactFlowObject, IRunChatflowMessageValue, INodeData } from './Interface' +import { buildLangchain, constructGraphs, getEndingNode, getStartingNodes, resolveVariables } from './utils' + +export class ChildProcess { + /** + * Stop child process when app is killed + */ + static async stopChildProcess() { + setTimeout(() => { + process.exit(0) + }, 50000) + } + + /** + * Process prediction + * @param {IRunChatflowMessageValue} messageValue + * @return {Promise} + */ + async runChildProcess(messageValue: IRunChatflowMessageValue): Promise { + process.on('SIGTERM', ChildProcess.stopChildProcess) + process.on('SIGINT', ChildProcess.stopChildProcess) + + await sendToParentProcess('start', '_') + + // Create a Queue and add our initial node in it + const { endingNodeData, chatflow, incomingInput, componentNodes } = messageValue + + let nodeToExecuteData: INodeData + let addToChatFlowPool: any = {} + + /* Don't rebuild the flow (to avoid duplicated upsert, recomputation) when all these conditions met: + * - Node Data already exists in pool + * - Still in sync (i.e the flow has not been modified since) + * - Existing overrideConfig and new overrideConfig are the same + * - Flow doesn't start with nodes that depend on incomingInput.question + ***/ + if (endingNodeData) { + nodeToExecuteData = endingNodeData + } else { + /*** Get chatflows and prepare data ***/ + const flowData = chatflow.flowData + const parsedFlowData: IReactFlowObject = JSON.parse(flowData) + const nodes = parsedFlowData.nodes + const edges = parsedFlowData.edges + + /*** Get Ending Node with Directed Graph ***/ + const { graph, nodeDependencies } = constructGraphs(nodes, edges) + const directedGraph = graph + const endingNodeId = getEndingNode(nodeDependencies, directedGraph) + if (!endingNodeId) { + await sendToParentProcess('error', `Ending node must be either a Chain or Agent`) + return + } + + const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data + if (!endingNodeData) { + await sendToParentProcess('error', `Ending node must be either a Chain or Agent`) + return + } + + if ( + endingNodeData.outputs && + Object.keys(endingNodeData.outputs).length && + !Object.values(endingNodeData.outputs).includes(endingNodeData.name) + ) { + await sendToParentProcess( + 'error', + `Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction` + ) + return + } + + /*** Get Starting Nodes with Non-Directed Graph ***/ + const constructedObj = constructGraphs(nodes, edges, true) + const nonDirectedGraph = constructedObj.graph + const { startingNodeIds, depthQueue } = getStartingNodes(nonDirectedGraph, endingNodeId) + + /*** BFS to traverse from Starting Nodes to Ending Node ***/ + const reactFlowNodes = await buildLangchain( + startingNodeIds, + nodes, + graph, + depthQueue, + componentNodes, + incomingInput.question, + incomingInput?.overrideConfig + ) + + const nodeToExecute = reactFlowNodes.find((node: IReactFlowNode) => node.id === endingNodeId) + if (!nodeToExecute) { + await sendToParentProcess('error', `Node ${endingNodeId} not found`) + return + } + + const reactFlowNodeData: INodeData = resolveVariables(nodeToExecute.data, reactFlowNodes, incomingInput.question) + nodeToExecuteData = reactFlowNodeData + + const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.id)) + addToChatFlowPool = { + chatflowid: chatflow.id, + nodeToExecuteData, + startingNodes, + overrideConfig: incomingInput?.overrideConfig + } + } + + const nodeInstanceFilePath = componentNodes[nodeToExecuteData.name].filePath as string + const nodeModule = await import(nodeInstanceFilePath) + const nodeInstance = new nodeModule.nodeClass() + + const result = await nodeInstance.run(nodeToExecuteData, incomingInput.question, { chatHistory: incomingInput.history }) + + await sendToParentProcess('finish', { result, addToChatFlowPool }) + } +} + +/** + * Send data back to parent process + * @param {string} key Key of message + * @param {*} value Value of message + * @returns {Promise} + */ +async function sendToParentProcess(key: string, value: any): Promise { + // tslint:disable-line:no-any + return new Promise((resolve, reject) => { + process.send!( + { + key, + value + }, + (error: Error) => { + if (error) { + return reject(error) + } + resolve() + } + ) + }) +} + +const childProcess = new ChildProcess() + +process.on('message', async (message: IChildProcessMessage) => { + if (message.key === 'start') { + await childProcess.runChildProcess(message.value) + process.exit() + } +}) diff --git a/packages/server/src/Interface.ts b/packages/server/src/Interface.ts index db0116e7..30f9fb29 100644 --- a/packages/server/src/Interface.ts +++ b/packages/server/src/Interface.ts @@ -138,3 +138,15 @@ export interface IDatabaseExport { chatflows: IChatFlow[] apikeys: ICommonObject[] } + +export interface IRunChatflowMessageValue { + chatflow: IChatFlow + incomingInput: IncomingInput + componentNodes: IComponentNodes + endingNodeData?: INodeData +} + +export interface IChildProcessMessage { + key: string + value?: any +} diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index f1990630..1ce66117 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -6,7 +6,16 @@ import http from 'http' import * as fs from 'fs' import basicAuth from 'express-basic-auth' -import { IChatFlow, IncomingInput, IReactFlowNode, IReactFlowObject, INodeData, IDatabaseExport } from './Interface' +import { + IChatFlow, + IncomingInput, + IReactFlowNode, + IReactFlowObject, + INodeData, + IDatabaseExport, + IRunChatflowMessageValue, + IChildProcessMessage +} from './Interface' import { getNodeModulesPackagePath, getStartingNodes, @@ -32,6 +41,7 @@ import { ChatFlow } from './entity/ChatFlow' import { ChatMessage } from './entity/ChatMessage' import { ChatflowPool } from './ChatflowPool' import { ICommonObject } from 'flowise-components' +import { fork } from 'child_process' export class App { app: express.Application @@ -369,6 +379,12 @@ export class App { }) } + /** + * Validate API Key + * @param {Request} req + * @param {Response} res + * @param {ChatFlow} chatflow + */ async validateKey(req: Request, res: Response, chatflow: ChatFlow) { const chatFlowApiKeyId = chatflow.apikeyid const authorizationHeader = (req.headers['Authorization'] as string) ?? (req.headers['authorization'] as string) ?? '' @@ -383,6 +399,73 @@ export class App { } } + /** + * Start child process + * @param {ChatFlow} chatflow + * @param {IncomingInput} incomingInput + * @param {INodeData} endingNodeData + */ + async startChildProcess(chatflow: ChatFlow, incomingInput: IncomingInput, endingNodeData?: INodeData) { + try { + const controller = new AbortController() + const { signal } = controller + + let childpath = path.join(__dirname, '..', 'dist', 'ChildProcess.js') + if (!fs.existsSync(childpath)) childpath = 'ChildProcess.ts' + + const childProcess = fork(childpath, [], { signal }) + + const value = { + chatflow, + incomingInput, + componentNodes: cloneDeep(this.nodesPool.componentNodes), + endingNodeData + } as IRunChatflowMessageValue + childProcess.send({ key: 'start', value } as IChildProcessMessage) + + let childProcessTimeout: NodeJS.Timeout + + return new Promise((resolve, reject) => { + childProcess.on('message', async (message: IChildProcessMessage) => { + if (message.key === 'finish') { + const { result, addToChatFlowPool } = message.value as ICommonObject + if (childProcessTimeout) { + clearTimeout(childProcessTimeout) + } + if (Object.keys(addToChatFlowPool).length) { + const { chatflowid, nodeToExecuteData, startingNodes, overrideConfig } = addToChatFlowPool + this.chatflowPool.add(chatflowid, nodeToExecuteData, startingNodes, overrideConfig) + } + resolve(result) + } + if (message.key === 'start') { + if (process.env.EXECUTION_TIMEOUT) { + childProcessTimeout = setTimeout(async () => { + childProcess.kill() + resolve(undefined) + }, parseInt(process.env.EXECUTION_TIMEOUT, 10)) + } + } + if (message.key === 'error') { + let errMessage = message.value as string + if (childProcessTimeout) { + clearTimeout(childProcessTimeout) + } + reject(errMessage) + } + }) + }) + } catch (err) { + console.error(err) + } + } + + /** + * Process Prediction + * @param {Request} req + * @param {Response} res + * @param {boolean} isInternal + */ async processPrediction(req: Request, res: Response, isInternal = false) { try { const chatflowid = req.params.id @@ -427,78 +510,102 @@ export class App { * - Existing overrideConfig and new overrideConfig are the same * - Flow doesn't start with nodes that depend on incomingInput.question ***/ - if ( - Object.prototype.hasOwnProperty.call(this.chatflowPool.activeChatflows, chatflowid) && - this.chatflowPool.activeChatflows[chatflowid].inSync && - isSameOverrideConfig( - isInternal, - this.chatflowPool.activeChatflows[chatflowid].overrideConfig, - incomingInput.overrideConfig - ) && - !isStartNodeDependOnInput(this.chatflowPool.activeChatflows[chatflowid].startingNodes) - ) { - nodeToExecuteData = this.chatflowPool.activeChatflows[chatflowid].endingNodeData - } else { - /*** Get chatflows and prepare data ***/ - const flowData = chatflow.flowData - const parsedFlowData: IReactFlowObject = JSON.parse(flowData) - const nodes = parsedFlowData.nodes - const edges = parsedFlowData.edges - - /*** Get Ending Node with Directed Graph ***/ - const { graph, nodeDependencies } = constructGraphs(nodes, edges) - const directedGraph = graph - const endingNodeId = getEndingNode(nodeDependencies, directedGraph) - if (!endingNodeId) return res.status(500).send(`Ending node must be either a Chain or Agent`) - - const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data - if (!endingNodeData) return res.status(500).send(`Ending node must be either a Chain or Agent`) - - if ( - endingNodeData.outputs && - Object.keys(endingNodeData.outputs).length && - !Object.values(endingNodeData.outputs).includes(endingNodeData.name) - ) { - return res - .status(500) - .send( - `Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction` - ) - } - - /*** Get Starting Nodes with Non-Directed Graph ***/ - const constructedObj = constructGraphs(nodes, edges, true) - const nonDirectedGraph = constructedObj.graph - const { startingNodeIds, depthQueue } = getStartingNodes(nonDirectedGraph, endingNodeId) - - /*** BFS to traverse from Starting Nodes to Ending Node ***/ - const reactFlowNodes = await buildLangchain( - startingNodeIds, - nodes, - graph, - depthQueue, - this.nodesPool.componentNodes, - incomingInput.question, - incomingInput?.overrideConfig + const isRebuildNeeded = () => { + return ( + Object.prototype.hasOwnProperty.call(this.chatflowPool.activeChatflows, chatflowid) && + this.chatflowPool.activeChatflows[chatflowid].inSync && + isSameOverrideConfig( + isInternal, + this.chatflowPool.activeChatflows[chatflowid].overrideConfig, + incomingInput.overrideConfig + ) && + !isStartNodeDependOnInput(this.chatflowPool.activeChatflows[chatflowid].startingNodes) ) - - const nodeToExecute = reactFlowNodes.find((node: IReactFlowNode) => node.id === endingNodeId) - if (!nodeToExecute) return res.status(404).send(`Node ${endingNodeId} not found`) - - const reactFlowNodeData: INodeData = resolveVariables(nodeToExecute.data, reactFlowNodes, incomingInput.question) - nodeToExecuteData = reactFlowNodeData - - const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.id)) - this.chatflowPool.add(chatflowid, nodeToExecuteData, startingNodes, incomingInput?.overrideConfig) } - const nodeInstanceFilePath = this.nodesPool.componentNodes[nodeToExecuteData.name].filePath as string - const nodeModule = await import(nodeInstanceFilePath) - const nodeInstance = new nodeModule.nodeClass() + if (process.env.EXECUTION_MODE === 'child') { + if (isRebuildNeeded()) { + nodeToExecuteData = this.chatflowPool.activeChatflows[chatflowid].endingNodeData + try { + const result = await this.startChildProcess(chatflow, incomingInput, nodeToExecuteData) - const result = await nodeInstance.run(nodeToExecuteData, incomingInput.question, { chatHistory: incomingInput.history }) + return res.json(result) + } catch (error) { + return res.status(500).send(error) + } + } else { + try { + const result = await this.startChildProcess(chatflow, incomingInput) + return res.json(result) + } catch (error) { + return res.status(500).send(error) + } + } + } else { + if (isRebuildNeeded()) { + nodeToExecuteData = this.chatflowPool.activeChatflows[chatflowid].endingNodeData + } else { + /*** Get chatflows and prepare data ***/ + const flowData = chatflow.flowData + const parsedFlowData: IReactFlowObject = JSON.parse(flowData) + const nodes = parsedFlowData.nodes + const edges = parsedFlowData.edges - return res.json(result) + /*** Get Ending Node with Directed Graph ***/ + const { graph, nodeDependencies } = constructGraphs(nodes, edges) + const directedGraph = graph + const endingNodeId = getEndingNode(nodeDependencies, directedGraph) + if (!endingNodeId) return res.status(500).send(`Ending node must be either a Chain or Agent`) + + const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data + if (!endingNodeData) return res.status(500).send(`Ending node must be either a Chain or Agent`) + + if ( + endingNodeData.outputs && + Object.keys(endingNodeData.outputs).length && + !Object.values(endingNodeData.outputs).includes(endingNodeData.name) + ) { + return res + .status(500) + .send( + `Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction` + ) + } + + /*** Get Starting Nodes with Non-Directed Graph ***/ + const constructedObj = constructGraphs(nodes, edges, true) + const nonDirectedGraph = constructedObj.graph + const { startingNodeIds, depthQueue } = getStartingNodes(nonDirectedGraph, endingNodeId) + + /*** BFS to traverse from Starting Nodes to Ending Node ***/ + const reactFlowNodes = await buildLangchain( + startingNodeIds, + nodes, + graph, + depthQueue, + this.nodesPool.componentNodes, + incomingInput.question, + incomingInput?.overrideConfig + ) + + const nodeToExecute = reactFlowNodes.find((node: IReactFlowNode) => node.id === endingNodeId) + if (!nodeToExecute) return res.status(404).send(`Node ${endingNodeId} not found`) + + const reactFlowNodeData: INodeData = resolveVariables(nodeToExecute.data, reactFlowNodes, incomingInput.question) + nodeToExecuteData = reactFlowNodeData + + const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.id)) + this.chatflowPool.add(chatflowid, nodeToExecuteData, startingNodes, incomingInput?.overrideConfig) + } + + const nodeInstanceFilePath = this.nodesPool.componentNodes[nodeToExecuteData.name].filePath as string + const nodeModule = await import(nodeInstanceFilePath) + const nodeInstance = new nodeModule.nodeClass() + + const result = await nodeInstance.run(nodeToExecuteData, incomingInput.question, { chatHistory: incomingInput.history }) + + return res.json(result) + } } catch (e: any) { return res.status(500).send(e.message) }