mirror of
https://github.com/farcasclaudiu/Flowise.git
synced 2026-06-28 17:01:00 +03:00
add child processes
This commit is contained in:
@@ -0,0 +1,148 @@
|
||||
import { IChildProcessMessage, IReactFlowNode, IReactFlowObject, IRunChatflowMessageValue, INodeData } from './Interface'
|
||||
import { buildLangchain, constructGraphs, getEndingNode, getStartingNodes, resolveVariables } from './utils'
|
||||
|
||||
export class ChildProcess {
|
||||
/**
|
||||
* Stop child process when app is killed
|
||||
*/
|
||||
static async stopChildProcess() {
|
||||
setTimeout(() => {
|
||||
process.exit(0)
|
||||
}, 50000)
|
||||
}
|
||||
|
||||
/**
|
||||
* Process prediction
|
||||
* @param {IRunChatflowMessageValue} messageValue
|
||||
* @return {Promise<void>}
|
||||
*/
|
||||
async runChildProcess(messageValue: IRunChatflowMessageValue): Promise<void> {
|
||||
process.on('SIGTERM', ChildProcess.stopChildProcess)
|
||||
process.on('SIGINT', ChildProcess.stopChildProcess)
|
||||
|
||||
await sendToParentProcess('start', '_')
|
||||
|
||||
// Create a Queue and add our initial node in it
|
||||
const { endingNodeData, chatflow, incomingInput, componentNodes } = messageValue
|
||||
|
||||
let nodeToExecuteData: INodeData
|
||||
let addToChatFlowPool: any = {}
|
||||
|
||||
/* Don't rebuild the flow (to avoid duplicated upsert, recomputation) when all these conditions met:
|
||||
* - Node Data already exists in pool
|
||||
* - Still in sync (i.e the flow has not been modified since)
|
||||
* - Existing overrideConfig and new overrideConfig are the same
|
||||
* - Flow doesn't start with nodes that depend on incomingInput.question
|
||||
***/
|
||||
if (endingNodeData) {
|
||||
nodeToExecuteData = endingNodeData
|
||||
} else {
|
||||
/*** Get chatflows and prepare data ***/
|
||||
const flowData = chatflow.flowData
|
||||
const parsedFlowData: IReactFlowObject = JSON.parse(flowData)
|
||||
const nodes = parsedFlowData.nodes
|
||||
const edges = parsedFlowData.edges
|
||||
|
||||
/*** Get Ending Node with Directed Graph ***/
|
||||
const { graph, nodeDependencies } = constructGraphs(nodes, edges)
|
||||
const directedGraph = graph
|
||||
const endingNodeId = getEndingNode(nodeDependencies, directedGraph)
|
||||
if (!endingNodeId) {
|
||||
await sendToParentProcess('error', `Ending node must be either a Chain or Agent`)
|
||||
return
|
||||
}
|
||||
|
||||
const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data
|
||||
if (!endingNodeData) {
|
||||
await sendToParentProcess('error', `Ending node must be either a Chain or Agent`)
|
||||
return
|
||||
}
|
||||
|
||||
if (
|
||||
endingNodeData.outputs &&
|
||||
Object.keys(endingNodeData.outputs).length &&
|
||||
!Object.values(endingNodeData.outputs).includes(endingNodeData.name)
|
||||
) {
|
||||
await sendToParentProcess(
|
||||
'error',
|
||||
`Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction`
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
/*** Get Starting Nodes with Non-Directed Graph ***/
|
||||
const constructedObj = constructGraphs(nodes, edges, true)
|
||||
const nonDirectedGraph = constructedObj.graph
|
||||
const { startingNodeIds, depthQueue } = getStartingNodes(nonDirectedGraph, endingNodeId)
|
||||
|
||||
/*** BFS to traverse from Starting Nodes to Ending Node ***/
|
||||
const reactFlowNodes = await buildLangchain(
|
||||
startingNodeIds,
|
||||
nodes,
|
||||
graph,
|
||||
depthQueue,
|
||||
componentNodes,
|
||||
incomingInput.question,
|
||||
incomingInput?.overrideConfig
|
||||
)
|
||||
|
||||
const nodeToExecute = reactFlowNodes.find((node: IReactFlowNode) => node.id === endingNodeId)
|
||||
if (!nodeToExecute) {
|
||||
await sendToParentProcess('error', `Node ${endingNodeId} not found`)
|
||||
return
|
||||
}
|
||||
|
||||
const reactFlowNodeData: INodeData = resolveVariables(nodeToExecute.data, reactFlowNodes, incomingInput.question)
|
||||
nodeToExecuteData = reactFlowNodeData
|
||||
|
||||
const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.id))
|
||||
addToChatFlowPool = {
|
||||
chatflowid: chatflow.id,
|
||||
nodeToExecuteData,
|
||||
startingNodes,
|
||||
overrideConfig: incomingInput?.overrideConfig
|
||||
}
|
||||
}
|
||||
|
||||
const nodeInstanceFilePath = componentNodes[nodeToExecuteData.name].filePath as string
|
||||
const nodeModule = await import(nodeInstanceFilePath)
|
||||
const nodeInstance = new nodeModule.nodeClass()
|
||||
|
||||
const result = await nodeInstance.run(nodeToExecuteData, incomingInput.question, { chatHistory: incomingInput.history })
|
||||
|
||||
await sendToParentProcess('finish', { result, addToChatFlowPool })
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send data back to parent process
|
||||
* @param {string} key Key of message
|
||||
* @param {*} value Value of message
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
async function sendToParentProcess(key: string, value: any): Promise<void> {
|
||||
// tslint:disable-line:no-any
|
||||
return new Promise((resolve, reject) => {
|
||||
process.send!(
|
||||
{
|
||||
key,
|
||||
value
|
||||
},
|
||||
(error: Error) => {
|
||||
if (error) {
|
||||
return reject(error)
|
||||
}
|
||||
resolve()
|
||||
}
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
const childProcess = new ChildProcess()
|
||||
|
||||
process.on('message', async (message: IChildProcessMessage) => {
|
||||
if (message.key === 'start') {
|
||||
await childProcess.runChildProcess(message.value)
|
||||
process.exit()
|
||||
}
|
||||
})
|
||||
Reference in New Issue
Block a user