diff --git a/packages/server/src/services/chatflows/index.ts b/packages/server/src/services/chatflows/index.ts index 58bbfbb1..03c801f0 100644 --- a/packages/server/src/services/chatflows/index.ts +++ b/packages/server/src/services/chatflows/index.ts @@ -41,40 +41,20 @@ const checkIfChatflowIsValidForStreaming = async (chatflowId: string): Promise endingNodeIds.includes(nd.id)) + const endingNodes = getEndingNodes(nodeDependencies, graph, nodes) let isStreaming = false - let isEndingNodeExists = endingNodes.find((node) => node.data?.outputs?.output === 'EndingNode') - for (const endingNode of endingNodes) { const endingNodeData = endingNode.data - if (!endingNodeData) { - throw new InternalFlowiseError(StatusCodes.NOT_FOUND, `Ending node ${endingNode.id} data not found`) - } - const isEndingNode = endingNodeData?.outputs?.output === 'EndingNode' - - if (!isEndingNode) { - if ( - endingNodeData && - endingNodeData.category !== 'Chains' && - endingNodeData.category !== 'Agents' && - endingNodeData.category !== 'Engine' - ) { - throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, `Ending node must be either a Chain or Agent`) - } + // Once custom function ending node exists, flow is always unavailable to stream + if (isEndingNode) { + return { isStreaming: false } } - - isStreaming = isEndingNode ? false : isFlowValidForStream(nodes, endingNodeData) + isStreaming = isFlowValidForStream(nodes, endingNodeData) } - // Once custom function ending node exists, flow is always unavailable to stream - const dbResponse = { isStreaming: isEndingNodeExists ? false : isStreaming } + const dbResponse = { isStreaming: isStreaming } return dbResponse } catch (error) { throw new InternalFlowiseError( diff --git a/packages/server/src/utils/buildChatflow.ts b/packages/server/src/utils/buildChatflow.ts index 42118b4d..3bedbd4b 100644 --- a/packages/server/src/utils/buildChatflow.ts +++ b/packages/server/src/utils/buildChatflow.ts @@ -179,50 +179,35 @@ export const utilBuildChatflow = async (req: Request, socketIO?: Server, isInter /*** Get Ending Node with Directed Graph ***/ const { graph, nodeDependencies } = constructGraphs(nodes, edges) const directedGraph = graph - const endingNodeIds = getEndingNodes(nodeDependencies, directedGraph) - if (!endingNodeIds.length) { - throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, `Ending nodes not found`) - } - const endingNodes = nodes.filter((nd) => endingNodeIds.includes(nd.id)) + const endingNodes = getEndingNodes(nodeDependencies, directedGraph, nodes) - let isEndingNodeExists = endingNodes.find((node) => node.data?.outputs?.output === 'EndingNode') + let isCustomFunctionEndingNode = endingNodes.some((node) => node.data?.outputs?.output === 'EndingNode') for (const endingNode of endingNodes) { const endingNodeData = endingNode.data - if (!endingNodeData) { - throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, `Ending node ${endingNode.id} data not found`) - } const isEndingNode = endingNodeData?.outputs?.output === 'EndingNode' - if (!isEndingNode) { - if ( - endingNodeData && - endingNodeData.category !== 'Chains' && - endingNodeData.category !== 'Agents' && - endingNodeData.category !== 'Engine' - ) { - throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, `Ending node must be either a Chain or Agent`) - } + // Once custom function ending node exists, no need to do follow-up checks. + if (isEndingNode) continue - if ( - endingNodeData.outputs && - Object.keys(endingNodeData.outputs).length && - !Object.values(endingNodeData.outputs ?? {}).includes(endingNodeData.name) - ) { - throw new InternalFlowiseError( - StatusCodes.INTERNAL_SERVER_ERROR, - `Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction` - ) - } + if ( + endingNodeData.outputs && + Object.keys(endingNodeData.outputs).length && + !Object.values(endingNodeData.outputs ?? {}).includes(endingNodeData.name) + ) { + throw new InternalFlowiseError( + StatusCodes.INTERNAL_SERVER_ERROR, + `Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction` + ) } isStreamValid = isFlowValidForStream(nodes, endingNodeData) } // Once custom function ending node exists, flow is always unavailable to stream - isStreamValid = isEndingNodeExists ? false : isStreamValid + isStreamValid = isCustomFunctionEndingNode ? false : isStreamValid let chatHistory: IMessage[] = [] @@ -253,6 +238,7 @@ export const utilBuildChatflow = async (req: Request, socketIO?: Server, isInter const nonDirectedGraph = constructedObj.graph let startingNodeIds: string[] = [] let depthQueue: IDepthQueue = {} + const endingNodeIds = endingNodes.map((n) => n.id) for (const endingNodeId of endingNodeIds) { const resx = getStartingNodes(nonDirectedGraph, endingNodeId) startingNodeIds.push(...resx.startingNodeIds) diff --git a/packages/server/src/utils/index.ts b/packages/server/src/utils/index.ts index 94551c13..311b3b9b 100644 --- a/packages/server/src/utils/index.ts +++ b/packages/server/src/utils/index.ts @@ -41,6 +41,8 @@ import { Assistant } from '../database/entities/Assistant' import { DataSource } from 'typeorm' import { CachePool } from '../CachePool' import { Variable } from '../database/entities/Variable' +import { InternalFlowiseError } from '../errors/internalFlowiseError' +import { StatusCodes } from 'http-status-codes' const QUESTION_VAR_PREFIX = 'question' const CHAT_HISTORY_VAR_PREFIX = 'chat_history' @@ -224,8 +226,13 @@ export const getAllConnectedNodes = (graph: INodeDirectedGraph, startNodeId: str * Get ending node and check if flow is valid * @param {INodeDependencies} nodeDependencies * @param {INodeDirectedGraph} graph + * @param {IReactFlowNode[]} allNodes */ -export const getEndingNodes = (nodeDependencies: INodeDependencies, graph: INodeDirectedGraph) => { +export const getEndingNodes = ( + nodeDependencies: INodeDependencies, + graph: INodeDirectedGraph, + allNodes: IReactFlowNode[] +): IReactFlowNode[] => { const endingNodeIds: string[] = [] Object.keys(graph).forEach((nodeId) => { if (Object.keys(nodeDependencies).length === 1) { @@ -234,7 +241,46 @@ export const getEndingNodes = (nodeDependencies: INodeDependencies, graph: INode endingNodeIds.push(nodeId) } }) - return endingNodeIds + + let endingNodes = allNodes.filter((nd) => endingNodeIds.includes(nd.id)) + + // If there are multiple endingnodes, the failed ones will be automatically ignored. + // And only ensure that at least one can pass the verification. + const verifiedEndingNodes: typeof endingNodes = [] + let error: InternalFlowiseError | null = null + for (const endingNode of endingNodes) { + const endingNodeData = endingNode.data + if (!endingNodeData) { + error = new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, `Ending node ${endingNode.id} data not found`) + + continue + } + + const isEndingNode = endingNodeData?.outputs?.output === 'EndingNode' + + if (!isEndingNode) { + if ( + endingNodeData && + endingNodeData.category !== 'Chains' && + endingNodeData.category !== 'Agents' && + endingNodeData.category !== 'Engine' + ) { + error = new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, `Ending node must be either a Chain or Agent`) + continue + } + } + verifiedEndingNodes.push(endingNode) + } + + if (verifiedEndingNodes.length > 0) { + return verifiedEndingNodes + } + + if (endingNodes.length === 0 || error === null) { + error = new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, `Ending nodes not found`) + } + + throw error } /**