Optimize getEndingNodes (#2133)

This commit is contained in:
YISH
2024-04-19 09:12:24 +08:00
committed by GitHub
parent 6bd8aaefc8
commit b7e4fc9517
3 changed files with 69 additions and 57 deletions
@@ -41,40 +41,20 @@ const checkIfChatflowIsValidForStreaming = async (chatflowId: string): Promise<a
const edges = parsedFlowData.edges const edges = parsedFlowData.edges
const { graph, nodeDependencies } = constructGraphs(nodes, edges) const { graph, nodeDependencies } = constructGraphs(nodes, edges)
const endingNodeIds = getEndingNodes(nodeDependencies, graph) const endingNodes = getEndingNodes(nodeDependencies, graph, nodes)
if (!endingNodeIds.length) {
throw new InternalFlowiseError(StatusCodes.NOT_FOUND, `Ending nodes not found`)
}
const endingNodes = nodes.filter((nd) => endingNodeIds.includes(nd.id))
let isStreaming = false let isStreaming = false
let isEndingNodeExists = endingNodes.find((node) => node.data?.outputs?.output === 'EndingNode')
for (const endingNode of endingNodes) { for (const endingNode of endingNodes) {
const endingNodeData = endingNode.data const endingNodeData = endingNode.data
if (!endingNodeData) {
throw new InternalFlowiseError(StatusCodes.NOT_FOUND, `Ending node ${endingNode.id} data not found`)
}
const isEndingNode = endingNodeData?.outputs?.output === 'EndingNode' const isEndingNode = endingNodeData?.outputs?.output === 'EndingNode'
// Once custom function ending node exists, flow is always unavailable to stream
if (!isEndingNode) { if (isEndingNode) {
if ( return { isStreaming: false }
endingNodeData &&
endingNodeData.category !== 'Chains' &&
endingNodeData.category !== 'Agents' &&
endingNodeData.category !== 'Engine'
) {
throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, `Ending node must be either a Chain or Agent`)
}
} }
isStreaming = isFlowValidForStream(nodes, endingNodeData)
isStreaming = isEndingNode ? false : isFlowValidForStream(nodes, endingNodeData)
} }
// Once custom function ending node exists, flow is always unavailable to stream const dbResponse = { isStreaming: isStreaming }
const dbResponse = { isStreaming: isEndingNodeExists ? false : isStreaming }
return dbResponse return dbResponse
} catch (error) { } catch (error) {
throw new InternalFlowiseError( throw new InternalFlowiseError(
+15 -29
View File
@@ -179,50 +179,35 @@ export const utilBuildChatflow = async (req: Request, socketIO?: Server, isInter
/*** Get Ending Node with Directed Graph ***/ /*** Get Ending Node with Directed Graph ***/
const { graph, nodeDependencies } = constructGraphs(nodes, edges) const { graph, nodeDependencies } = constructGraphs(nodes, edges)
const directedGraph = graph const directedGraph = graph
const endingNodeIds = getEndingNodes(nodeDependencies, directedGraph)
if (!endingNodeIds.length) {
throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, `Ending nodes not found`)
}
const endingNodes = nodes.filter((nd) => endingNodeIds.includes(nd.id)) const endingNodes = getEndingNodes(nodeDependencies, directedGraph, nodes)
let isEndingNodeExists = endingNodes.find((node) => node.data?.outputs?.output === 'EndingNode') let isCustomFunctionEndingNode = endingNodes.some((node) => node.data?.outputs?.output === 'EndingNode')
for (const endingNode of endingNodes) { for (const endingNode of endingNodes) {
const endingNodeData = endingNode.data const endingNodeData = endingNode.data
if (!endingNodeData) {
throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, `Ending node ${endingNode.id} data not found`)
}
const isEndingNode = endingNodeData?.outputs?.output === 'EndingNode' const isEndingNode = endingNodeData?.outputs?.output === 'EndingNode'
if (!isEndingNode) { // Once custom function ending node exists, no need to do follow-up checks.
if ( if (isEndingNode) continue
endingNodeData &&
endingNodeData.category !== 'Chains' &&
endingNodeData.category !== 'Agents' &&
endingNodeData.category !== 'Engine'
) {
throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, `Ending node must be either a Chain or Agent`)
}
if ( if (
endingNodeData.outputs && endingNodeData.outputs &&
Object.keys(endingNodeData.outputs).length && Object.keys(endingNodeData.outputs).length &&
!Object.values(endingNodeData.outputs ?? {}).includes(endingNodeData.name) !Object.values(endingNodeData.outputs ?? {}).includes(endingNodeData.name)
) { ) {
throw new InternalFlowiseError( throw new InternalFlowiseError(
StatusCodes.INTERNAL_SERVER_ERROR, StatusCodes.INTERNAL_SERVER_ERROR,
`Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction` `Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction`
) )
}
} }
isStreamValid = isFlowValidForStream(nodes, endingNodeData) isStreamValid = isFlowValidForStream(nodes, endingNodeData)
} }
// Once custom function ending node exists, flow is always unavailable to stream // Once custom function ending node exists, flow is always unavailable to stream
isStreamValid = isEndingNodeExists ? false : isStreamValid isStreamValid = isCustomFunctionEndingNode ? false : isStreamValid
let chatHistory: IMessage[] = [] let chatHistory: IMessage[] = []
@@ -253,6 +238,7 @@ export const utilBuildChatflow = async (req: Request, socketIO?: Server, isInter
const nonDirectedGraph = constructedObj.graph const nonDirectedGraph = constructedObj.graph
let startingNodeIds: string[] = [] let startingNodeIds: string[] = []
let depthQueue: IDepthQueue = {} let depthQueue: IDepthQueue = {}
const endingNodeIds = endingNodes.map((n) => n.id)
for (const endingNodeId of endingNodeIds) { for (const endingNodeId of endingNodeIds) {
const resx = getStartingNodes(nonDirectedGraph, endingNodeId) const resx = getStartingNodes(nonDirectedGraph, endingNodeId)
startingNodeIds.push(...resx.startingNodeIds) startingNodeIds.push(...resx.startingNodeIds)
+48 -2
View File
@@ -41,6 +41,8 @@ import { Assistant } from '../database/entities/Assistant'
import { DataSource } from 'typeorm' import { DataSource } from 'typeorm'
import { CachePool } from '../CachePool' import { CachePool } from '../CachePool'
import { Variable } from '../database/entities/Variable' import { Variable } from '../database/entities/Variable'
import { InternalFlowiseError } from '../errors/internalFlowiseError'
import { StatusCodes } from 'http-status-codes'
const QUESTION_VAR_PREFIX = 'question' const QUESTION_VAR_PREFIX = 'question'
const CHAT_HISTORY_VAR_PREFIX = 'chat_history' const CHAT_HISTORY_VAR_PREFIX = 'chat_history'
@@ -224,8 +226,13 @@ export const getAllConnectedNodes = (graph: INodeDirectedGraph, startNodeId: str
* Get ending node and check if flow is valid * Get ending node and check if flow is valid
* @param {INodeDependencies} nodeDependencies * @param {INodeDependencies} nodeDependencies
* @param {INodeDirectedGraph} graph * @param {INodeDirectedGraph} graph
* @param {IReactFlowNode[]} allNodes
*/ */
export const getEndingNodes = (nodeDependencies: INodeDependencies, graph: INodeDirectedGraph) => { export const getEndingNodes = (
nodeDependencies: INodeDependencies,
graph: INodeDirectedGraph,
allNodes: IReactFlowNode[]
): IReactFlowNode[] => {
const endingNodeIds: string[] = [] const endingNodeIds: string[] = []
Object.keys(graph).forEach((nodeId) => { Object.keys(graph).forEach((nodeId) => {
if (Object.keys(nodeDependencies).length === 1) { if (Object.keys(nodeDependencies).length === 1) {
@@ -234,7 +241,46 @@ export const getEndingNodes = (nodeDependencies: INodeDependencies, graph: INode
endingNodeIds.push(nodeId) endingNodeIds.push(nodeId)
} }
}) })
return endingNodeIds
let endingNodes = allNodes.filter((nd) => endingNodeIds.includes(nd.id))
// If there are multiple endingnodes, the failed ones will be automatically ignored.
// And only ensure that at least one can pass the verification.
const verifiedEndingNodes: typeof endingNodes = []
let error: InternalFlowiseError | null = null
for (const endingNode of endingNodes) {
const endingNodeData = endingNode.data
if (!endingNodeData) {
error = new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, `Ending node ${endingNode.id} data not found`)
continue
}
const isEndingNode = endingNodeData?.outputs?.output === 'EndingNode'
if (!isEndingNode) {
if (
endingNodeData &&
endingNodeData.category !== 'Chains' &&
endingNodeData.category !== 'Agents' &&
endingNodeData.category !== 'Engine'
) {
error = new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, `Ending node must be either a Chain or Agent`)
continue
}
}
verifiedEndingNodes.push(endingNode)
}
if (verifiedEndingNodes.length > 0) {
return verifiedEndingNodes
}
if (endingNodes.length === 0 || error === null) {
error = new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, `Ending nodes not found`)
}
throw error
} }
/** /**