mirror of
https://github.com/farcasclaudiu/Flowise.git
synced 2026-06-28 15:00:57 +03:00
Merge pull request #745 from FlowiseAI/chore/Remove-Child-Mode
Chore/removing child mode
This commit is contained in:
@@ -17,7 +17,6 @@ PASSPHRASE=MYPASSPHRASE # Passphrase used to create encryption key
|
||||
# FLOWISE_PASSWORD=1234
|
||||
# DEBUG=true
|
||||
# LOG_LEVEL=debug (error | warn | info | verbose | debug)
|
||||
# EXECUTION_MODE=main (child | main)
|
||||
# TOOL_FUNCTION_BUILTIN_DEP=crypto,fs
|
||||
# TOOL_FUNCTION_EXTERNAL_DEP=moment,lodash
|
||||
|
||||
|
||||
@@ -1,20 +1,20 @@
|
||||
<!-- markdownlint-disable MD030 -->
|
||||
|
||||
# Flowise - 低代码LLM应用程序构建器
|
||||
# Flowise - 低代码 LLM 应用程序构建器
|
||||
|
||||
[English](<./README.md>) | 中文
|
||||
[English](./README.md) | 中文
|
||||
|
||||

|
||||
|
||||
拖放界面来构建自定义的LLM流程
|
||||
拖放界面来构建自定义的 LLM 流程
|
||||
|
||||
## ⚡快速入门
|
||||
## ⚡ 快速入门
|
||||
|
||||
1. 安装Flowise
|
||||
1. 安装 Flowise
|
||||
```bash
|
||||
npm install -g flowise
|
||||
```
|
||||
2. 启动Flowise
|
||||
2. 启动 Flowise
|
||||
|
||||
```bash
|
||||
npx flowise start
|
||||
@@ -33,28 +33,27 @@ FLOWISE_PASSWORD=1234
|
||||
|
||||
## 🌱 环境变量
|
||||
|
||||
Flowise支持不同的环境变量来配置您的实例。您可以在`packages/server`文件夹中的`.env`文件中指定以下变量。阅读[更多](https://docs.flowiseai.com/environment-variables)
|
||||
Flowise 支持不同的环境变量来配置您的实例。您可以在`packages/server`文件夹中的`.env`文件中指定以下变量。阅读[更多](https://docs.flowiseai.com/environment-variables)
|
||||
|
||||
| 变量 | 描述 | 类型 | 默认值 |
|
||||
| ---------------- | ---------------------------------------------------------------- | ------------------------------------------------ | ----------------------------------- |
|
||||
| PORT | Flowise运行的HTTP端口 | 数字 | 3000 |
|
||||
| FLOWISE_USERNAME | 登录的用户名 | 字符串 | |
|
||||
| FLOWISE_PASSWORD | 登录的密码 | 字符串 | |
|
||||
| DEBUG | 打印组件的日志 | 布尔值 | |
|
||||
| LOG_PATH | 存储日志文件的位置 | 字符串 | `your-path/Flowise/logs` |
|
||||
| LOG_LEVEL | 日志的不同级别 | 枚举字符串:`error`、`info`、`verbose`、`debug` | `info` |
|
||||
| APIKEY_PATH | 存储API密钥的位置 | 字符串 | `your-path/Flowise/packages/server` |
|
||||
| EXECUTION_MODE | 预测是在其自己的进程中运行还是在主进程中运行 | 枚举字符串:`child`、`main` | `main` |
|
||||
| TOOL_FUNCTION_BUILTIN_DEP | 用于工具函数的NodeJS内置模块 | 字符串 | |
|
||||
| TOOL_FUNCTION_EXTERNAL_DEP | 用于工具函数的外部模块 | 字符串 | |
|
||||
| OVERRIDE_DATABASE | 使用默认值覆盖当前数据库 | 枚举字符串:`true`、`false` | `true` |
|
||||
| DATABASE_TYPE | 存储flowise数据的数据库类型 | 枚举字符串:`sqlite`、`mysql`、`postgres` | `sqlite` |
|
||||
| DATABASE_PATH | 数据库的保存位置(当DATABASE_TYPE为sqlite时) | 字符串 | `your-home-dir/.flowise` |
|
||||
| DATABASE_HOST | 主机URL或IP地址(当DATABASE_TYPE不为sqlite时) | 字符串 | |
|
||||
| DATABASE_PORT | 数据库端口(当DATABASE_TYPE不为sqlite时) | 字符串 | |
|
||||
| DATABASE_USERNAME | 数据库用户名(当DATABASE_TYPE不为sqlite时) | 字符串 | |
|
||||
| DATABASE_PASSWORD | 数据库密码(当DATABASE_TYPE不为sqlite时) | 字符串 | |
|
||||
| DATABASE_NAME | 数据库名称(当DATABASE_TYPE不为sqlite时) | 字符串 | |
|
||||
| 变量 | 描述 | 类型 | 默认值 |
|
||||
| -------------------------- | ------------------------------------------------------ | ----------------------------------------------- | ----------------------------------- |
|
||||
| PORT | Flowise 运行的 HTTP 端口 | 数字 | 3000 |
|
||||
| FLOWISE_USERNAME | 登录的用户名 | 字符串 | |
|
||||
| FLOWISE_PASSWORD | 登录的密码 | 字符串 | |
|
||||
| DEBUG | 打印组件的日志 | 布尔值 | |
|
||||
| LOG_PATH | 存储日志文件的位置 | 字符串 | `your-path/Flowise/logs` |
|
||||
| LOG_LEVEL | 日志的不同级别 | 枚举字符串:`error`、`info`、`verbose`、`debug` | `info` |
|
||||
| APIKEY_PATH | 存储 API 密钥的位置 | 字符串 | `your-path/Flowise/packages/server` |
|
||||
| TOOL_FUNCTION_BUILTIN_DEP | 用于工具函数的 NodeJS 内置模块 | 字符串 | |
|
||||
| TOOL_FUNCTION_EXTERNAL_DEP | 用于工具函数的外部模块 | 字符串 | |
|
||||
| OVERRIDE_DATABASE | 使用默认值覆盖当前数据库 | 枚举字符串:`true`、`false` | `true` |
|
||||
| DATABASE_TYPE | 存储 flowise 数据的数据库类型 | 枚举字符串:`sqlite`、`mysql`、`postgres` | `sqlite` |
|
||||
| DATABASE_PATH | 数据库的保存位置(当 DATABASE_TYPE 为 sqlite 时) | 字符串 | `your-home-dir/.flowise` |
|
||||
| DATABASE_HOST | 主机 URL 或 IP 地址(当 DATABASE_TYPE 不为 sqlite 时) | 字符串 | |
|
||||
| DATABASE_PORT | 数据库端口(当 DATABASE_TYPE 不为 sqlite 时) | 字符串 | |
|
||||
| DATABASE_USERNAME | 数据库用户名(当 DATABASE_TYPE 不为 sqlite 时) | 字符串 | |
|
||||
| DATABASE_PASSWORD | 数据库密码(当 DATABASE_TYPE 不为 sqlite 时) | 字符串 | |
|
||||
| DATABASE_NAME | 数据库名称(当 DATABASE_TYPE 不为 sqlite 时) | 字符串 | |
|
||||
|
||||
您还可以在使用`npx`时指定环境变量。例如:
|
||||
|
||||
@@ -64,7 +63,7 @@ npx flowise start --PORT=3000 --DEBUG=true
|
||||
|
||||
## 📖 文档
|
||||
|
||||
[Flowise文档](https://docs.flowiseai.com/)
|
||||
[Flowise 文档](https://docs.flowiseai.com/)
|
||||
|
||||
## 🌐 自托管
|
||||
|
||||
@@ -98,4 +97,4 @@ npx flowise start --PORT=3000 --DEBUG=true
|
||||
|
||||
## 📄 许可证
|
||||
|
||||
本仓库中的源代码在[MIT许可证](https://github.com/FlowiseAI/Flowise/blob/master/LICENSE.md)下提供。
|
||||
本仓库中的源代码在[MIT 许可证](https://github.com/FlowiseAI/Flowise/blob/master/LICENSE.md)下提供。
|
||||
|
||||
@@ -1,253 +0,0 @@
|
||||
import path from 'path'
|
||||
import { IChildProcessMessage, IReactFlowNode, IReactFlowObject, IRunChatflowMessageValue, INodeData } from './Interface'
|
||||
import {
|
||||
buildLangchain,
|
||||
checkMemorySessionId,
|
||||
constructGraphs,
|
||||
getEndingNode,
|
||||
getStartingNodes,
|
||||
getUserHome,
|
||||
replaceInputsWithConfig,
|
||||
resolveVariables,
|
||||
databaseEntities
|
||||
} from './utils'
|
||||
import { DataSource } from 'typeorm'
|
||||
import { ChatFlow } from './entity/ChatFlow'
|
||||
import { ChatMessage } from './entity/ChatMessage'
|
||||
import { Tool } from './entity/Tool'
|
||||
import { Credential } from './entity/Credential'
|
||||
import logger from './utils/logger'
|
||||
|
||||
export class ChildProcess {
|
||||
/**
|
||||
* Stop child process when app is killed
|
||||
*/
|
||||
static async stopChildProcess() {
|
||||
setTimeout(() => {
|
||||
process.exit(0)
|
||||
}, 50000)
|
||||
}
|
||||
|
||||
/**
|
||||
* Process prediction
|
||||
* @param {IRunChatflowMessageValue} messageValue
|
||||
* @return {Promise<void>}
|
||||
*/
|
||||
async runChildProcess(messageValue: IRunChatflowMessageValue): Promise<void> {
|
||||
process.on('SIGTERM', ChildProcess.stopChildProcess)
|
||||
process.on('SIGINT', ChildProcess.stopChildProcess)
|
||||
|
||||
await sendToParentProcess('start', '_')
|
||||
|
||||
try {
|
||||
const childAppDataSource = await initDB()
|
||||
|
||||
// Create a Queue and add our initial node in it
|
||||
const { endingNodeData, chatflow, chatId, incomingInput, componentNodes } = messageValue
|
||||
|
||||
let nodeToExecuteData: INodeData
|
||||
let addToChatFlowPool: any = {}
|
||||
|
||||
/* Don't rebuild the flow (to avoid duplicated upsert, recomputation) when all these conditions met:
|
||||
* - Node Data already exists in pool
|
||||
* - Still in sync (i.e the flow has not been modified since)
|
||||
* - Existing overrideConfig and new overrideConfig are the same
|
||||
* - Flow doesn't start with nodes that depend on incomingInput.question
|
||||
***/
|
||||
if (endingNodeData) {
|
||||
nodeToExecuteData = endingNodeData
|
||||
} else {
|
||||
/*** Get chatflows and prepare data ***/
|
||||
const flowData = chatflow.flowData
|
||||
const parsedFlowData: IReactFlowObject = JSON.parse(flowData)
|
||||
const nodes = parsedFlowData.nodes
|
||||
const edges = parsedFlowData.edges
|
||||
|
||||
/*** Get Ending Node with Directed Graph ***/
|
||||
const { graph, nodeDependencies } = constructGraphs(nodes, edges)
|
||||
const directedGraph = graph
|
||||
const endingNodeId = getEndingNode(nodeDependencies, directedGraph)
|
||||
if (!endingNodeId) {
|
||||
await sendToParentProcess('error', `Ending node ${endingNodeId} not found`)
|
||||
return
|
||||
}
|
||||
|
||||
const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data
|
||||
if (!endingNodeData) {
|
||||
await sendToParentProcess('error', `Ending node ${endingNodeId} data not found`)
|
||||
return
|
||||
}
|
||||
|
||||
if (endingNodeData && endingNodeData.category !== 'Chains' && endingNodeData.category !== 'Agents') {
|
||||
await sendToParentProcess('error', `Ending node must be either a Chain or Agent`)
|
||||
return
|
||||
}
|
||||
|
||||
if (
|
||||
endingNodeData.outputs &&
|
||||
Object.keys(endingNodeData.outputs).length &&
|
||||
!Object.values(endingNodeData.outputs).includes(endingNodeData.name)
|
||||
) {
|
||||
await sendToParentProcess(
|
||||
'error',
|
||||
`Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction`
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
/*** Get Starting Nodes with Non-Directed Graph ***/
|
||||
const constructedObj = constructGraphs(nodes, edges, true)
|
||||
const nonDirectedGraph = constructedObj.graph
|
||||
const { startingNodeIds, depthQueue } = getStartingNodes(nonDirectedGraph, endingNodeId)
|
||||
|
||||
logger.debug(`[server] [mode:child]: Start building chatflow ${chatflow.id}`)
|
||||
/*** BFS to traverse from Starting Nodes to Ending Node ***/
|
||||
const reactFlowNodes = await buildLangchain(
|
||||
startingNodeIds,
|
||||
nodes,
|
||||
graph,
|
||||
depthQueue,
|
||||
componentNodes,
|
||||
incomingInput.question,
|
||||
chatId,
|
||||
childAppDataSource,
|
||||
incomingInput?.overrideConfig
|
||||
)
|
||||
|
||||
const nodeToExecute = reactFlowNodes.find((node: IReactFlowNode) => node.id === endingNodeId)
|
||||
if (!nodeToExecute) {
|
||||
await sendToParentProcess('error', `Node ${endingNodeId} not found`)
|
||||
return
|
||||
}
|
||||
|
||||
if (incomingInput.overrideConfig)
|
||||
nodeToExecute.data = replaceInputsWithConfig(nodeToExecute.data, incomingInput.overrideConfig)
|
||||
const reactFlowNodeData: INodeData = resolveVariables(nodeToExecute.data, reactFlowNodes, incomingInput.question)
|
||||
nodeToExecuteData = reactFlowNodeData
|
||||
|
||||
const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.id))
|
||||
addToChatFlowPool = {
|
||||
chatflowid: chatflow.id,
|
||||
nodeToExecuteData,
|
||||
startingNodes,
|
||||
overrideConfig: incomingInput?.overrideConfig
|
||||
}
|
||||
}
|
||||
|
||||
const nodeInstanceFilePath = componentNodes[nodeToExecuteData.name].filePath as string
|
||||
const nodeModule = await import(nodeInstanceFilePath)
|
||||
const nodeInstance = new nodeModule.nodeClass()
|
||||
|
||||
logger.debug(`[server] [mode:child]: Running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`)
|
||||
|
||||
if (nodeToExecuteData.instance) checkMemorySessionId(nodeToExecuteData.instance, chatId)
|
||||
|
||||
const result = await nodeInstance.run(nodeToExecuteData, incomingInput.question, {
|
||||
chatHistory: incomingInput.history,
|
||||
appDataSource: childAppDataSource,
|
||||
databaseEntities
|
||||
})
|
||||
|
||||
logger.debug(`[server] [mode:child]: Finished running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`)
|
||||
|
||||
await sendToParentProcess('finish', { result, addToChatFlowPool })
|
||||
} catch (e: any) {
|
||||
await sendToParentProcess('error', e.message)
|
||||
logger.error('[server] [mode:child]: Error:', e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize DB in child process
|
||||
* @returns {DataSource}
|
||||
*/
|
||||
async function initDB() {
|
||||
let childAppDataSource
|
||||
let homePath
|
||||
const synchronize = process.env.OVERRIDE_DATABASE === 'false' ? false : true
|
||||
switch (process.env.DATABASE_TYPE) {
|
||||
case 'sqlite':
|
||||
homePath = process.env.DATABASE_PATH ?? path.join(getUserHome(), '.flowise')
|
||||
childAppDataSource = new DataSource({
|
||||
type: 'sqlite',
|
||||
database: path.resolve(homePath, 'database.sqlite'),
|
||||
synchronize,
|
||||
entities: [ChatFlow, ChatMessage, Tool, Credential],
|
||||
migrations: []
|
||||
})
|
||||
break
|
||||
case 'mysql':
|
||||
childAppDataSource = new DataSource({
|
||||
type: 'mysql',
|
||||
host: process.env.DATABASE_HOST,
|
||||
port: parseInt(process.env.DATABASE_PORT || '3306'),
|
||||
username: process.env.DATABASE_USER,
|
||||
password: process.env.DATABASE_PASSWORD,
|
||||
database: process.env.DATABASE_NAME,
|
||||
charset: 'utf8mb4',
|
||||
synchronize,
|
||||
entities: [ChatFlow, ChatMessage, Tool, Credential],
|
||||
migrations: []
|
||||
})
|
||||
break
|
||||
case 'postgres':
|
||||
childAppDataSource = new DataSource({
|
||||
type: 'postgres',
|
||||
host: process.env.DATABASE_HOST,
|
||||
port: parseInt(process.env.DATABASE_PORT || '5432'),
|
||||
username: process.env.DATABASE_USER,
|
||||
password: process.env.DATABASE_PASSWORD,
|
||||
database: process.env.DATABASE_NAME,
|
||||
synchronize,
|
||||
entities: [ChatFlow, ChatMessage, Tool, Credential],
|
||||
migrations: []
|
||||
})
|
||||
break
|
||||
default:
|
||||
homePath = process.env.DATABASE_PATH ?? path.join(getUserHome(), '.flowise')
|
||||
childAppDataSource = new DataSource({
|
||||
type: 'sqlite',
|
||||
database: path.resolve(homePath, 'database.sqlite'),
|
||||
synchronize,
|
||||
entities: [ChatFlow, ChatMessage, Tool, Credential],
|
||||
migrations: []
|
||||
})
|
||||
break
|
||||
}
|
||||
|
||||
return await childAppDataSource.initialize()
|
||||
}
|
||||
|
||||
/**
|
||||
* Send data back to parent process
|
||||
* @param {string} key Key of message
|
||||
* @param {*} value Value of message
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
async function sendToParentProcess(key: string, value: any): Promise<void> {
|
||||
// tslint:disable-line:no-any
|
||||
return new Promise((resolve, reject) => {
|
||||
process.send!(
|
||||
{
|
||||
key,
|
||||
value
|
||||
},
|
||||
(error: Error) => {
|
||||
if (error) {
|
||||
return reject(error)
|
||||
}
|
||||
resolve()
|
||||
}
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
const childProcess = new ChildProcess()
|
||||
|
||||
process.on('message', async (message: IChildProcessMessage) => {
|
||||
if (message.key === 'start') {
|
||||
await childProcess.runChildProcess(message.value)
|
||||
process.exit()
|
||||
}
|
||||
})
|
||||
@@ -169,19 +169,6 @@ export interface IDatabaseExport {
|
||||
apikeys: ICommonObject[]
|
||||
}
|
||||
|
||||
export interface IRunChatflowMessageValue {
|
||||
chatflow: IChatFlow
|
||||
chatId: string
|
||||
incomingInput: IncomingInput
|
||||
componentNodes: IComponentNodes
|
||||
endingNodeData?: INodeData
|
||||
}
|
||||
|
||||
export interface IChildProcessMessage {
|
||||
key: string
|
||||
value?: any
|
||||
}
|
||||
|
||||
export type ICredentialDataDecrypted = ICommonObject
|
||||
|
||||
// Plain credential object sent to server
|
||||
|
||||
@@ -25,7 +25,6 @@ export default class Start extends Command {
|
||||
SECRETKEY_PATH: Flags.string(),
|
||||
LOG_PATH: Flags.string(),
|
||||
LOG_LEVEL: Flags.string(),
|
||||
EXECUTION_MODE: Flags.string(),
|
||||
TOOL_FUNCTION_BUILTIN_DEP: Flags.string(),
|
||||
TOOL_FUNCTION_EXTERNAL_DEP: Flags.string(),
|
||||
OVERRIDE_DATABASE: Flags.string(),
|
||||
@@ -73,7 +72,6 @@ export default class Start extends Command {
|
||||
const { flags } = await this.parse(Start)
|
||||
|
||||
if (flags.PORT) process.env.PORT = flags.PORT
|
||||
if (flags.EXECUTION_MODE) process.env.EXECUTION_MODE = flags.EXECUTION_MODE
|
||||
if (flags.DEBUG) process.env.DEBUG = flags.DEBUG
|
||||
|
||||
// Authorization
|
||||
|
||||
+88
-175
@@ -16,8 +16,6 @@ import {
|
||||
IReactFlowObject,
|
||||
INodeData,
|
||||
IDatabaseExport,
|
||||
IRunChatflowMessageValue,
|
||||
IChildProcessMessage,
|
||||
ICredentialReturnResponse
|
||||
} from './Interface'
|
||||
import {
|
||||
@@ -57,7 +55,6 @@ import { Credential } from './entity/Credential'
|
||||
import { Tool } from './entity/Tool'
|
||||
import { ChatflowPool } from './ChatflowPool'
|
||||
import { ICommonObject, INodeOptionsValue } from 'flowise-components'
|
||||
import { fork } from 'child_process'
|
||||
|
||||
export class App {
|
||||
app: express.Application
|
||||
@@ -764,68 +761,6 @@ export class App {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Start child process
|
||||
* @param {ChatFlow} chatflow
|
||||
* @param {IncomingInput} incomingInput
|
||||
* @param {INodeData} endingNodeData
|
||||
*/
|
||||
async startChildProcess(chatflow: ChatFlow, chatId: string, incomingInput: IncomingInput, endingNodeData?: INodeData) {
|
||||
try {
|
||||
const controller = new AbortController()
|
||||
const { signal } = controller
|
||||
|
||||
let childpath = path.join(__dirname, '..', 'dist', 'ChildProcess.js')
|
||||
if (!fs.existsSync(childpath)) childpath = 'ChildProcess.ts'
|
||||
|
||||
const childProcess = fork(childpath, [], { signal })
|
||||
|
||||
const value = {
|
||||
chatflow,
|
||||
chatId,
|
||||
incomingInput,
|
||||
componentNodes: cloneDeep(this.nodesPool.componentNodes),
|
||||
endingNodeData
|
||||
} as IRunChatflowMessageValue
|
||||
childProcess.send({ key: 'start', value } as IChildProcessMessage)
|
||||
|
||||
let childProcessTimeout: NodeJS.Timeout
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
childProcess.on('message', async (message: IChildProcessMessage) => {
|
||||
if (message.key === 'finish') {
|
||||
const { result, addToChatFlowPool } = message.value as ICommonObject
|
||||
if (childProcessTimeout) {
|
||||
clearTimeout(childProcessTimeout)
|
||||
}
|
||||
if (Object.keys(addToChatFlowPool).length) {
|
||||
const { chatflowid, nodeToExecuteData, startingNodes, overrideConfig } = addToChatFlowPool
|
||||
this.chatflowPool.add(chatflowid, nodeToExecuteData, startingNodes, overrideConfig)
|
||||
}
|
||||
resolve(result)
|
||||
}
|
||||
if (message.key === 'start') {
|
||||
if (process.env.EXECUTION_TIMEOUT) {
|
||||
childProcessTimeout = setTimeout(async () => {
|
||||
childProcess.kill()
|
||||
resolve(undefined)
|
||||
}, parseInt(process.env.EXECUTION_TIMEOUT, 10))
|
||||
}
|
||||
}
|
||||
if (message.key === 'error') {
|
||||
let errMessage = message.value as string
|
||||
if (childProcessTimeout) {
|
||||
clearTimeout(childProcessTimeout)
|
||||
}
|
||||
reject(errMessage)
|
||||
}
|
||||
})
|
||||
})
|
||||
} catch (err) {
|
||||
logger.error('[server] [mode:child]: Error:', err)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Process Prediction
|
||||
* @param {Request} req
|
||||
@@ -895,126 +830,104 @@ export class App {
|
||||
)
|
||||
}
|
||||
|
||||
if (process.env.EXECUTION_MODE === 'child') {
|
||||
if (isFlowReusable()) {
|
||||
nodeToExecuteData = this.chatflowPool.activeChatflows[chatflowid].endingNodeData
|
||||
logger.debug(
|
||||
`[server] [mode:child]: Reuse existing chatflow ${chatflowid} with ending node ${nodeToExecuteData.label} (${nodeToExecuteData.id})`
|
||||
)
|
||||
try {
|
||||
const result = await this.startChildProcess(chatflow, chatId, incomingInput, nodeToExecuteData)
|
||||
return res.json(result)
|
||||
} catch (error) {
|
||||
return res.status(500).send(error)
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
const result = await this.startChildProcess(chatflow, chatId, incomingInput)
|
||||
return res.json(result)
|
||||
} catch (error) {
|
||||
return res.status(500).send(error)
|
||||
}
|
||||
}
|
||||
/*** Get chatflows and prepare data ***/
|
||||
const flowData = chatflow.flowData
|
||||
const parsedFlowData: IReactFlowObject = JSON.parse(flowData)
|
||||
const nodes = parsedFlowData.nodes
|
||||
const edges = parsedFlowData.edges
|
||||
|
||||
if (isFlowReusable()) {
|
||||
nodeToExecuteData = this.chatflowPool.activeChatflows[chatflowid].endingNodeData
|
||||
isStreamValid = isFlowValidForStream(nodes, nodeToExecuteData)
|
||||
logger.debug(
|
||||
`[server]: Reuse existing chatflow ${chatflowid} with ending node ${nodeToExecuteData.label} (${nodeToExecuteData.id})`
|
||||
)
|
||||
} else {
|
||||
/*** Get chatflows and prepare data ***/
|
||||
const flowData = chatflow.flowData
|
||||
const parsedFlowData: IReactFlowObject = JSON.parse(flowData)
|
||||
const nodes = parsedFlowData.nodes
|
||||
const edges = parsedFlowData.edges
|
||||
/*** Get Ending Node with Directed Graph ***/
|
||||
const { graph, nodeDependencies } = constructGraphs(nodes, edges)
|
||||
const directedGraph = graph
|
||||
const endingNodeId = getEndingNode(nodeDependencies, directedGraph)
|
||||
if (!endingNodeId) return res.status(500).send(`Ending node ${endingNodeId} not found`)
|
||||
|
||||
if (isFlowReusable()) {
|
||||
nodeToExecuteData = this.chatflowPool.activeChatflows[chatflowid].endingNodeData
|
||||
isStreamValid = isFlowValidForStream(nodes, nodeToExecuteData)
|
||||
logger.debug(
|
||||
`[server]: Reuse existing chatflow ${chatflowid} with ending node ${nodeToExecuteData.label} (${nodeToExecuteData.id})`
|
||||
)
|
||||
} else {
|
||||
/*** Get Ending Node with Directed Graph ***/
|
||||
const { graph, nodeDependencies } = constructGraphs(nodes, edges)
|
||||
const directedGraph = graph
|
||||
const endingNodeId = getEndingNode(nodeDependencies, directedGraph)
|
||||
if (!endingNodeId) return res.status(500).send(`Ending node ${endingNodeId} not found`)
|
||||
const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data
|
||||
if (!endingNodeData) return res.status(500).send(`Ending node ${endingNodeId} data not found`)
|
||||
|
||||
const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data
|
||||
if (!endingNodeData) return res.status(500).send(`Ending node ${endingNodeId} data not found`)
|
||||
|
||||
if (endingNodeData && endingNodeData.category !== 'Chains' && endingNodeData.category !== 'Agents') {
|
||||
return res.status(500).send(`Ending node must be either a Chain or Agent`)
|
||||
}
|
||||
|
||||
if (
|
||||
endingNodeData.outputs &&
|
||||
Object.keys(endingNodeData.outputs).length &&
|
||||
!Object.values(endingNodeData.outputs).includes(endingNodeData.name)
|
||||
) {
|
||||
return res
|
||||
.status(500)
|
||||
.send(
|
||||
`Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction`
|
||||
)
|
||||
}
|
||||
|
||||
isStreamValid = isFlowValidForStream(nodes, endingNodeData)
|
||||
|
||||
/*** Get Starting Nodes with Non-Directed Graph ***/
|
||||
const constructedObj = constructGraphs(nodes, edges, true)
|
||||
const nonDirectedGraph = constructedObj.graph
|
||||
const { startingNodeIds, depthQueue } = getStartingNodes(nonDirectedGraph, endingNodeId)
|
||||
|
||||
logger.debug(`[server]: Start building chatflow ${chatflowid}`)
|
||||
/*** BFS to traverse from Starting Nodes to Ending Node ***/
|
||||
const reactFlowNodes = await buildLangchain(
|
||||
startingNodeIds,
|
||||
nodes,
|
||||
graph,
|
||||
depthQueue,
|
||||
this.nodesPool.componentNodes,
|
||||
incomingInput.question,
|
||||
chatId,
|
||||
this.AppDataSource,
|
||||
incomingInput?.overrideConfig
|
||||
)
|
||||
|
||||
const nodeToExecute = reactFlowNodes.find((node: IReactFlowNode) => node.id === endingNodeId)
|
||||
if (!nodeToExecute) return res.status(404).send(`Node ${endingNodeId} not found`)
|
||||
|
||||
if (incomingInput.overrideConfig)
|
||||
nodeToExecute.data = replaceInputsWithConfig(nodeToExecute.data, incomingInput.overrideConfig)
|
||||
const reactFlowNodeData: INodeData = resolveVariables(nodeToExecute.data, reactFlowNodes, incomingInput.question)
|
||||
nodeToExecuteData = reactFlowNodeData
|
||||
|
||||
const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.id))
|
||||
this.chatflowPool.add(chatflowid, nodeToExecuteData, startingNodes, incomingInput?.overrideConfig)
|
||||
if (endingNodeData && endingNodeData.category !== 'Chains' && endingNodeData.category !== 'Agents') {
|
||||
return res.status(500).send(`Ending node must be either a Chain or Agent`)
|
||||
}
|
||||
|
||||
const nodeInstanceFilePath = this.nodesPool.componentNodes[nodeToExecuteData.name].filePath as string
|
||||
const nodeModule = await import(nodeInstanceFilePath)
|
||||
const nodeInstance = new nodeModule.nodeClass()
|
||||
if (
|
||||
endingNodeData.outputs &&
|
||||
Object.keys(endingNodeData.outputs).length &&
|
||||
!Object.values(endingNodeData.outputs).includes(endingNodeData.name)
|
||||
) {
|
||||
return res
|
||||
.status(500)
|
||||
.send(
|
||||
`Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction`
|
||||
)
|
||||
}
|
||||
|
||||
isStreamValid = isStreamValid && !isVectorStoreFaiss(nodeToExecuteData)
|
||||
logger.debug(`[server]: Running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`)
|
||||
isStreamValid = isFlowValidForStream(nodes, endingNodeData)
|
||||
|
||||
if (nodeToExecuteData.instance) checkMemorySessionId(nodeToExecuteData.instance, chatId)
|
||||
/*** Get Starting Nodes with Non-Directed Graph ***/
|
||||
const constructedObj = constructGraphs(nodes, edges, true)
|
||||
const nonDirectedGraph = constructedObj.graph
|
||||
const { startingNodeIds, depthQueue } = getStartingNodes(nonDirectedGraph, endingNodeId)
|
||||
|
||||
const result = isStreamValid
|
||||
? await nodeInstance.run(nodeToExecuteData, incomingInput.question, {
|
||||
chatHistory: incomingInput.history,
|
||||
socketIO,
|
||||
socketIOClientId: incomingInput.socketIOClientId,
|
||||
logger,
|
||||
appDataSource: this.AppDataSource,
|
||||
databaseEntities
|
||||
})
|
||||
: await nodeInstance.run(nodeToExecuteData, incomingInput.question, {
|
||||
chatHistory: incomingInput.history,
|
||||
logger,
|
||||
appDataSource: this.AppDataSource,
|
||||
databaseEntities
|
||||
})
|
||||
logger.debug(`[server]: Start building chatflow ${chatflowid}`)
|
||||
/*** BFS to traverse from Starting Nodes to Ending Node ***/
|
||||
const reactFlowNodes = await buildLangchain(
|
||||
startingNodeIds,
|
||||
nodes,
|
||||
graph,
|
||||
depthQueue,
|
||||
this.nodesPool.componentNodes,
|
||||
incomingInput.question,
|
||||
chatId,
|
||||
this.AppDataSource,
|
||||
incomingInput?.overrideConfig
|
||||
)
|
||||
|
||||
logger.debug(`[server]: Finished running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`)
|
||||
return res.json(result)
|
||||
const nodeToExecute = reactFlowNodes.find((node: IReactFlowNode) => node.id === endingNodeId)
|
||||
if (!nodeToExecute) return res.status(404).send(`Node ${endingNodeId} not found`)
|
||||
|
||||
if (incomingInput.overrideConfig)
|
||||
nodeToExecute.data = replaceInputsWithConfig(nodeToExecute.data, incomingInput.overrideConfig)
|
||||
const reactFlowNodeData: INodeData = resolveVariables(nodeToExecute.data, reactFlowNodes, incomingInput.question)
|
||||
nodeToExecuteData = reactFlowNodeData
|
||||
|
||||
const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.id))
|
||||
this.chatflowPool.add(chatflowid, nodeToExecuteData, startingNodes, incomingInput?.overrideConfig)
|
||||
}
|
||||
|
||||
const nodeInstanceFilePath = this.nodesPool.componentNodes[nodeToExecuteData.name].filePath as string
|
||||
const nodeModule = await import(nodeInstanceFilePath)
|
||||
const nodeInstance = new nodeModule.nodeClass()
|
||||
|
||||
isStreamValid = isStreamValid && !isVectorStoreFaiss(nodeToExecuteData)
|
||||
logger.debug(`[server]: Running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`)
|
||||
|
||||
if (nodeToExecuteData.instance) checkMemorySessionId(nodeToExecuteData.instance, chatId)
|
||||
|
||||
const result = isStreamValid
|
||||
? await nodeInstance.run(nodeToExecuteData, incomingInput.question, {
|
||||
chatHistory: incomingInput.history,
|
||||
socketIO,
|
||||
socketIOClientId: incomingInput.socketIOClientId,
|
||||
logger,
|
||||
appDataSource: this.AppDataSource,
|
||||
databaseEntities
|
||||
})
|
||||
: await nodeInstance.run(nodeToExecuteData, incomingInput.question, {
|
||||
chatHistory: incomingInput.history,
|
||||
logger,
|
||||
appDataSource: this.AppDataSource,
|
||||
databaseEntities
|
||||
})
|
||||
|
||||
logger.debug(`[server]: Finished running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`)
|
||||
return res.json(result)
|
||||
} catch (e: any) {
|
||||
logger.error('[server]: Error:', e)
|
||||
return res.status(500).send(e.message)
|
||||
|
||||
@@ -791,7 +791,7 @@ export const isFlowValidForStream = (reactFlowNodes: IReactFlowNode[], endingNod
|
||||
isValidChainOrAgent = whitelistAgents.includes(endingNodeData.name)
|
||||
}
|
||||
|
||||
return isChatOrLLMsExist && isValidChainOrAgent && !isVectorStoreFaiss(endingNodeData) && process.env.EXECUTION_MODE !== 'child'
|
||||
return isChatOrLLMsExist && isValidChainOrAgent && !isVectorStoreFaiss(endingNodeData)
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
Reference in New Issue
Block a user