mirror of
https://github.com/farcasclaudiu/Flowise.git
synced 2026-06-28 11:00:55 +03:00
Merge branch 'main' into feature/env-vars
This commit is contained in:
@@ -97,9 +97,13 @@ export const getNodeModulesPackagePath = (packageName: string): string => {
|
||||
* Construct graph and node dependencies score
|
||||
* @param {IReactFlowNode[]} reactFlowNodes
|
||||
* @param {IReactFlowEdge[]} reactFlowEdges
|
||||
* @param {boolean} isNondirected
|
||||
* @param {{ isNonDirected?: boolean, isReversed?: boolean }} options
|
||||
*/
|
||||
export const constructGraphs = (reactFlowNodes: IReactFlowNode[], reactFlowEdges: IReactFlowEdge[], isNondirected = false) => {
|
||||
export const constructGraphs = (
|
||||
reactFlowNodes: IReactFlowNode[],
|
||||
reactFlowEdges: IReactFlowEdge[],
|
||||
options?: { isNonDirected?: boolean; isReversed?: boolean }
|
||||
) => {
|
||||
const nodeDependencies = {} as INodeDependencies
|
||||
const graph = {} as INodeDirectedGraph
|
||||
|
||||
@@ -109,6 +113,23 @@ export const constructGraphs = (reactFlowNodes: IReactFlowNode[], reactFlowEdges
|
||||
graph[nodeId] = []
|
||||
}
|
||||
|
||||
if (options && options.isReversed) {
|
||||
for (let i = 0; i < reactFlowEdges.length; i += 1) {
|
||||
const source = reactFlowEdges[i].source
|
||||
const target = reactFlowEdges[i].target
|
||||
|
||||
if (Object.prototype.hasOwnProperty.call(graph, target)) {
|
||||
graph[target].push(source)
|
||||
} else {
|
||||
graph[target] = [source]
|
||||
}
|
||||
|
||||
nodeDependencies[target] += 1
|
||||
}
|
||||
|
||||
return { graph, nodeDependencies }
|
||||
}
|
||||
|
||||
for (let i = 0; i < reactFlowEdges.length; i += 1) {
|
||||
const source = reactFlowEdges[i].source
|
||||
const target = reactFlowEdges[i].target
|
||||
@@ -119,7 +140,7 @@ export const constructGraphs = (reactFlowNodes: IReactFlowNode[], reactFlowEdges
|
||||
graph[source] = [target]
|
||||
}
|
||||
|
||||
if (isNondirected) {
|
||||
if (options && options.isNonDirected) {
|
||||
if (Object.prototype.hasOwnProperty.call(graph, target)) {
|
||||
graph[target].push(source)
|
||||
} else {
|
||||
@@ -181,21 +202,49 @@ export const getStartingNodes = (graph: INodeDirectedGraph, endNodeId: string) =
|
||||
return { startingNodeIds, depthQueue: depthQueueReversed }
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all connected nodes from startnode
|
||||
* @param {INodeDependencies} graph
|
||||
* @param {string} startNodeId
|
||||
*/
|
||||
export const getAllConnectedNodes = (graph: INodeDirectedGraph, startNodeId: string) => {
|
||||
const visited = new Set<string>()
|
||||
const queue: Array<[string]> = [[startNodeId]]
|
||||
|
||||
while (queue.length > 0) {
|
||||
const [currentNode] = queue.shift()!
|
||||
|
||||
if (visited.has(currentNode)) {
|
||||
continue
|
||||
}
|
||||
|
||||
visited.add(currentNode)
|
||||
|
||||
for (const neighbor of graph[currentNode]) {
|
||||
if (!visited.has(neighbor)) {
|
||||
queue.push([neighbor])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return [...visited]
|
||||
}
|
||||
|
||||
/**
|
||||
* Get ending node and check if flow is valid
|
||||
* @param {INodeDependencies} nodeDependencies
|
||||
* @param {INodeDirectedGraph} graph
|
||||
*/
|
||||
export const getEndingNode = (nodeDependencies: INodeDependencies, graph: INodeDirectedGraph) => {
|
||||
let endingNodeId = ''
|
||||
export const getEndingNodes = (nodeDependencies: INodeDependencies, graph: INodeDirectedGraph) => {
|
||||
const endingNodeIds: string[] = []
|
||||
Object.keys(graph).forEach((nodeId) => {
|
||||
if (Object.keys(nodeDependencies).length === 1) {
|
||||
endingNodeId = nodeId
|
||||
endingNodeIds.push(nodeId)
|
||||
} else if (!graph[nodeId].length && nodeDependencies[nodeId] > 0) {
|
||||
endingNodeId = nodeId
|
||||
endingNodeIds.push(nodeId)
|
||||
}
|
||||
})
|
||||
return endingNodeId
|
||||
return endingNodeIds
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -215,6 +264,7 @@ export const getEndingNode = (nodeDependencies: INodeDependencies, graph: INodeD
|
||||
export const buildLangchain = async (
|
||||
startingNodeIds: string[],
|
||||
reactFlowNodes: IReactFlowNode[],
|
||||
reactFlowEdges: IReactFlowEdge[],
|
||||
graph: INodeDirectedGraph,
|
||||
depthQueue: IDepthQueue,
|
||||
componentNodes: IComponentNodes,
|
||||
@@ -233,6 +283,8 @@ export const buildLangchain = async (
|
||||
// Create a Queue and add our initial node in it
|
||||
const nodeQueue = [] as INodeQueue[]
|
||||
const exploredNode = {} as IExploredNode
|
||||
const dynamicVariables = {} as Record<string, unknown>
|
||||
let ignoreNodeIds: string[] = []
|
||||
|
||||
// In the case of infinite loop, only max 3 loops will be executed
|
||||
const maxLoop = 3
|
||||
@@ -269,20 +321,59 @@ export const buildLangchain = async (
|
||||
appDataSource,
|
||||
databaseEntities,
|
||||
logger,
|
||||
cachePool
|
||||
cachePool,
|
||||
dynamicVariables
|
||||
})
|
||||
logger.debug(`[server]: Finished upserting ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
|
||||
break
|
||||
} else {
|
||||
logger.debug(`[server]: Initializing ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
|
||||
flowNodes[nodeIndex].data.instance = await newNodeInstance.init(reactFlowNodeData, question, {
|
||||
let outputResult = await newNodeInstance.init(reactFlowNodeData, question, {
|
||||
chatId,
|
||||
chatflowid,
|
||||
appDataSource,
|
||||
databaseEntities,
|
||||
logger,
|
||||
cachePool
|
||||
cachePool,
|
||||
dynamicVariables
|
||||
})
|
||||
|
||||
// Save dynamic variables
|
||||
if (reactFlowNode.data.name === 'setVariable') {
|
||||
const dynamicVars = outputResult?.dynamicVariables ?? {}
|
||||
|
||||
for (const variableKey in dynamicVars) {
|
||||
dynamicVariables[variableKey] = dynamicVars[variableKey]
|
||||
}
|
||||
|
||||
outputResult = outputResult?.output
|
||||
}
|
||||
|
||||
// Determine which nodes to route next when it comes to ifElse
|
||||
if (reactFlowNode.data.name === 'ifElseFunction' && typeof outputResult === 'object') {
|
||||
let sourceHandle = ''
|
||||
if (outputResult.type === true) {
|
||||
sourceHandle = `${nodeId}-output-returnFalse-string|number|boolean|json|array`
|
||||
} else if (outputResult.type === false) {
|
||||
sourceHandle = `${nodeId}-output-returnTrue-string|number|boolean|json|array`
|
||||
}
|
||||
|
||||
const ifElseEdge = reactFlowEdges.find((edg) => edg.source === nodeId && edg.sourceHandle === sourceHandle)
|
||||
if (ifElseEdge) {
|
||||
const { graph } = constructGraphs(
|
||||
reactFlowNodes,
|
||||
reactFlowEdges.filter((edg) => !(edg.source === nodeId && edg.sourceHandle === sourceHandle)),
|
||||
{ isNonDirected: true }
|
||||
)
|
||||
ignoreNodeIds.push(ifElseEdge.target, ...getAllConnectedNodes(graph, ifElseEdge.target))
|
||||
ignoreNodeIds = [...new Set(ignoreNodeIds)]
|
||||
}
|
||||
|
||||
outputResult = outputResult?.output
|
||||
}
|
||||
|
||||
flowNodes[nodeIndex].data.instance = outputResult
|
||||
|
||||
logger.debug(`[server]: Finished initializing ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
|
||||
}
|
||||
} catch (e: any) {
|
||||
@@ -290,7 +381,7 @@ export const buildLangchain = async (
|
||||
throw new Error(e)
|
||||
}
|
||||
|
||||
const neighbourNodeIds = graph[nodeId]
|
||||
let neighbourNodeIds = graph[nodeId]
|
||||
const nextDepth = depth + 1
|
||||
|
||||
// Find other nodes that are on the same depth level
|
||||
@@ -301,9 +392,11 @@ export const buildLangchain = async (
|
||||
neighbourNodeIds.push(id)
|
||||
}
|
||||
|
||||
neighbourNodeIds = neighbourNodeIds.filter((neigh) => !ignoreNodeIds.includes(neigh))
|
||||
|
||||
for (let i = 0; i < neighbourNodeIds.length; i += 1) {
|
||||
const neighNodeId = neighbourNodeIds[i]
|
||||
|
||||
if (ignoreNodeIds.includes(neighNodeId)) continue
|
||||
// If nodeId has been seen, cycle detected
|
||||
if (Object.prototype.hasOwnProperty.call(exploredNode, neighNodeId)) {
|
||||
const { remainingLoop, lastSeenDepth } = exploredNode[neighNodeId]
|
||||
@@ -321,6 +414,12 @@ export const buildLangchain = async (
|
||||
nodeQueue.push({ nodeId: neighNodeId, depth: nextDepth })
|
||||
}
|
||||
}
|
||||
|
||||
// Move end node to last
|
||||
if (!neighbourNodeIds.length) {
|
||||
const index = flowNodes.findIndex((nd) => nd.data.id === nodeId)
|
||||
flowNodes.push(flowNodes.splice(index, 1)[0])
|
||||
}
|
||||
}
|
||||
return flowNodes
|
||||
}
|
||||
@@ -713,6 +812,7 @@ export const findAvailableConfigs = (reactFlowNodes: IReactFlowNode[], component
|
||||
|
||||
/**
|
||||
* Check to see if flow valid for stream
|
||||
* TODO: perform check from component level. i.e: set streaming on component, and check here
|
||||
* @param {IReactFlowNode[]} reactFlowNodes
|
||||
* @param {INodeData} endingNodeData
|
||||
* @returns {boolean}
|
||||
|
||||
Reference in New Issue
Block a user