Merge branch 'main' into FEATURE/Vision

# Conflicts:
#	packages/server/src/index.ts
This commit is contained in:
vinodkiran
2023-12-20 19:17:15 +05:30
279 changed files with 5377 additions and 1336 deletions
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "flowise",
"version": "1.4.5",
"version": "1.4.6",
"description": "Flowiseai Server",
"main": "dist/index",
"types": "dist/index.d.ts",
+97 -45
View File
@@ -20,13 +20,14 @@ import {
ICredentialReturnResponse,
chatType,
IChatMessage,
IReactFlowEdge
IReactFlowEdge,
IDepthQueue
} from './Interface'
import {
getNodeModulesPackagePath,
getStartingNodes,
buildLangchain,
getEndingNode,
getEndingNodes,
constructGraphs,
resolveVariables,
isStartNodeDependOnInput,
@@ -55,7 +56,7 @@ import { Tool } from './database/entities/Tool'
import { Assistant } from './database/entities/Assistant'
import { ChatflowPool } from './ChatflowPool'
import { CachePool } from './CachePool'
import { ICommonObject, IMessage, INodeOptionsValue } from 'flowise-components'
import { ICommonObject, IMessage, INodeOptionsValue, handleEscapeCharacters } from 'flowise-components'
import { createRateLimiter, getRateLimiter, initializeRateLimiter } from './utils/rateLimit'
import { addAPIKey, compareKeys, deleteAPIKey, getApiKey, getAPIKeys, updateAPIKey } from './utils/apiKey'
import { sanitizeMiddleware } from './utils/XSS'
@@ -281,6 +282,29 @@ export class App {
}
})
// execute custom function node
this.app.post('/api/v1/node-custom-function', async (req: Request, res: Response) => {
const body = req.body
const nodeData = { inputs: body }
if (Object.prototype.hasOwnProperty.call(this.nodesPool.componentNodes, 'customFunction')) {
try {
const nodeInstanceFilePath = this.nodesPool.componentNodes['customFunction'].filePath as string
const nodeModule = await import(nodeInstanceFilePath)
const newNodeInstance = new nodeModule.nodeClass()
const returnData = await newNodeInstance.init(nodeData)
const result = typeof returnData === 'string' ? handleEscapeCharacters(returnData, true) : returnData
return res.json(result)
} catch (error) {
return res.status(500).send(`Error running custom function: ${error}`)
}
} else {
res.status(404).send(`Node customFunction not found`)
return
}
})
// ----------------------------------------
// Chatflows
// ----------------------------------------
@@ -409,19 +433,24 @@ export class App {
const edges = parsedFlowData.edges
const { graph, nodeDependencies } = constructGraphs(nodes, edges)
const endingNodeId = getEndingNode(nodeDependencies, graph)
if (!endingNodeId) return res.status(500).send(`Ending node ${endingNodeId} not found`)
const endingNodeIds = getEndingNodes(nodeDependencies, graph)
if (!endingNodeIds.length) return res.status(500).send(`Ending nodes not found`)
const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data
if (!endingNodeData) return res.status(500).send(`Ending node ${endingNodeId} data not found`)
const endingNodes = nodes.filter((nd) => endingNodeIds.includes(nd.id))
if (endingNodeData && endingNodeData.category !== 'Chains' && endingNodeData.category !== 'Agents') {
return res.status(500).send(`Ending node must be either a Chain or Agent`)
let isStreaming = false
for (const endingNode of endingNodes) {
const endingNodeData = endingNode.data
if (!endingNodeData) return res.status(500).send(`Ending node ${endingNode.id} data not found`)
if (endingNodeData && endingNodeData.category !== 'Chains' && endingNodeData.category !== 'Agents') {
return res.status(500).send(`Ending node must be either a Chain or Agent`)
}
isStreaming = isFlowValidForStream(nodes, endingNodeData)
}
const obj = {
isStreaming: isFlowValidForStream(nodes, endingNodeData)
}
const obj = { isStreaming }
return res.json(obj)
})
@@ -1515,50 +1544,67 @@ export class App {
/*** Get Ending Node with Directed Graph ***/
const { graph, nodeDependencies } = constructGraphs(nodes, edges)
const directedGraph = graph
const endingNodeId = getEndingNode(nodeDependencies, directedGraph)
if (!endingNodeId) return res.status(500).send(`Ending node ${endingNodeId} not found`)
const endingNodeIds = getEndingNodes(nodeDependencies, directedGraph)
if (!endingNodeIds.length) return res.status(500).send(`Ending nodes not found`)
const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data
if (!endingNodeData) return res.status(500).send(`Ending node ${endingNodeId} data not found`)
const endingNodes = nodes.filter((nd) => endingNodeIds.includes(nd.id))
for (const endingNode of endingNodes) {
const endingNodeData = endingNode.data
if (!endingNodeData) return res.status(500).send(`Ending node ${endingNode.id} data not found`)
if (endingNodeData && endingNodeData.category !== 'Chains' && endingNodeData.category !== 'Agents' && !isUpsert) {
if (endingNodeData.type !== 'OpenAIMultiModalChain') {
if (endingNodeData && endingNodeData.category !== 'Chains' && endingNodeData.category !== 'Agents') {
if (endingNodeData.type !== 'OpenAIMultiModalChain') {
return res.status(500).send(`Ending node must be either a Chain or Agent`)
}
}
if (
endingNodeData.outputs &&
Object.keys(endingNodeData.outputs).length &&
!Object.values(endingNodeData.outputs).includes(endingNodeData.name) &&
!isUpsert
) {
return res
.status(500)
.send(
`Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction`
)
if (
endingNodeData.outputs &&
Object.keys(endingNodeData.outputs).length &&
!Object.values(endingNodeData.outputs).includes(endingNodeData.name) &&
!isUpsert
) {
return res
.status(500)
.send(
`Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction`
)
}
isStreamValid = isFlowValidForStream(nodes, endingNodeData)
}
isStreamValid = isFlowValidForStream(nodes, endingNodeData)
let chatHistory: IMessage[] | string = incomingInput.history
if (
endingNodeData.inputs?.memory &&
!incomingInput.history &&
(incomingInput.chatId || incomingInput.overrideConfig?.sessionId)
) {
const memoryNodeId = endingNodeData.inputs?.memory.split('.')[0].replace('{{', '')
const memoryNode = nodes.find((node) => node.data.id === memoryNodeId)
if (memoryNode) {
chatHistory = await replaceChatHistory(memoryNode, incomingInput, this.AppDataSource, databaseEntities, logger)
// When {{chat_history}} is used in Prompt Template, fetch the chat conversations from memory
for (const endingNode of endingNodes) {
const endingNodeData = endingNode.data
if (!endingNodeData.inputs?.memory) continue
if (
endingNodeData.inputs?.memory &&
!incomingInput.history &&
(incomingInput.chatId || incomingInput.overrideConfig?.sessionId)
) {
const memoryNodeId = endingNodeData.inputs?.memory.split('.')[0].replace('{{', '')
const memoryNode = nodes.find((node) => node.data.id === memoryNodeId)
if (memoryNode) {
chatHistory = await replaceChatHistory(memoryNode, incomingInput, this.AppDataSource, databaseEntities, logger)
}
}
}
/*** Get Starting Nodes with Non-Directed Graph ***/
const constructedObj = constructGraphs(nodes, edges, true)
/*** Get Starting Nodes with Reversed Graph ***/
const constructedObj = constructGraphs(nodes, edges, { isReversed: true })
const nonDirectedGraph = constructedObj.graph
const { startingNodeIds, depthQueue } = getStartingNodes(nonDirectedGraph, endingNodeId)
let startingNodeIds: string[] = []
let depthQueue: IDepthQueue = {}
for (const endingNodeId of endingNodeIds) {
const res = getStartingNodes(nonDirectedGraph, endingNodeId)
startingNodeIds.push(...res.startingNodeIds)
depthQueue = Object.assign(depthQueue, res.depthQueue)
}
startingNodeIds = [...new Set(startingNodeIds)]
const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.id))
logger.debug(`[server]: Start building chatflow ${chatflowid}`)
@@ -1566,6 +1612,7 @@ export class App {
const reactFlowNodes = await buildLangchain(
startingNodeIds,
nodes,
edges,
graph,
depthQueue,
this.nodesPool.componentNodes,
@@ -1579,13 +1626,18 @@ export class App {
isUpsert,
incomingInput.stopNodeId
)
// If request is upsert, stop here
if (isUpsert) {
this.chatflowPool.add(chatflowid, undefined, startingNodes, incomingInput?.overrideConfig)
return res.status(201).send('Successfully Upserted')
}
const nodeToExecute = reactFlowNodes.find((node: IReactFlowNode) => node.id === endingNodeId)
if (!nodeToExecute) return res.status(404).send(`Node ${endingNodeId} not found`)
const nodeToExecute =
endingNodeIds.length === 1
? reactFlowNodes.find((node: IReactFlowNode) => endingNodeIds[0] === node.id)
: reactFlowNodes[reactFlowNodes.length - 1]
if (!nodeToExecute) return res.status(404).send(`Node not found`)
if (incomingInput.overrideConfig) {
nodeToExecute.data = replaceInputsWithConfig(nodeToExecute.data, incomingInput.overrideConfig)
+113 -13
View File
@@ -95,9 +95,13 @@ export const getNodeModulesPackagePath = (packageName: string): string => {
* Construct graph and node dependencies score
* @param {IReactFlowNode[]} reactFlowNodes
* @param {IReactFlowEdge[]} reactFlowEdges
* @param {boolean} isNondirected
* @param {{ isNonDirected?: boolean, isReversed?: boolean }} options
*/
export const constructGraphs = (reactFlowNodes: IReactFlowNode[], reactFlowEdges: IReactFlowEdge[], isNondirected = false) => {
export const constructGraphs = (
reactFlowNodes: IReactFlowNode[],
reactFlowEdges: IReactFlowEdge[],
options?: { isNonDirected?: boolean; isReversed?: boolean }
) => {
const nodeDependencies = {} as INodeDependencies
const graph = {} as INodeDirectedGraph
@@ -107,6 +111,23 @@ export const constructGraphs = (reactFlowNodes: IReactFlowNode[], reactFlowEdges
graph[nodeId] = []
}
if (options && options.isReversed) {
for (let i = 0; i < reactFlowEdges.length; i += 1) {
const source = reactFlowEdges[i].source
const target = reactFlowEdges[i].target
if (Object.prototype.hasOwnProperty.call(graph, target)) {
graph[target].push(source)
} else {
graph[target] = [source]
}
nodeDependencies[target] += 1
}
return { graph, nodeDependencies }
}
for (let i = 0; i < reactFlowEdges.length; i += 1) {
const source = reactFlowEdges[i].source
const target = reactFlowEdges[i].target
@@ -117,7 +138,7 @@ export const constructGraphs = (reactFlowNodes: IReactFlowNode[], reactFlowEdges
graph[source] = [target]
}
if (isNondirected) {
if (options && options.isNonDirected) {
if (Object.prototype.hasOwnProperty.call(graph, target)) {
graph[target].push(source)
} else {
@@ -179,21 +200,49 @@ export const getStartingNodes = (graph: INodeDirectedGraph, endNodeId: string) =
return { startingNodeIds, depthQueue: depthQueueReversed }
}
/**
* Get all connected nodes from startnode
* @param {INodeDependencies} graph
* @param {string} startNodeId
*/
export const getAllConnectedNodes = (graph: INodeDirectedGraph, startNodeId: string) => {
const visited = new Set<string>()
const queue: Array<[string]> = [[startNodeId]]
while (queue.length > 0) {
const [currentNode] = queue.shift()!
if (visited.has(currentNode)) {
continue
}
visited.add(currentNode)
for (const neighbor of graph[currentNode]) {
if (!visited.has(neighbor)) {
queue.push([neighbor])
}
}
}
return [...visited]
}
/**
* Get ending node and check if flow is valid
* @param {INodeDependencies} nodeDependencies
* @param {INodeDirectedGraph} graph
*/
export const getEndingNode = (nodeDependencies: INodeDependencies, graph: INodeDirectedGraph) => {
let endingNodeId = ''
export const getEndingNodes = (nodeDependencies: INodeDependencies, graph: INodeDirectedGraph) => {
const endingNodeIds: string[] = []
Object.keys(graph).forEach((nodeId) => {
if (Object.keys(nodeDependencies).length === 1) {
endingNodeId = nodeId
endingNodeIds.push(nodeId)
} else if (!graph[nodeId].length && nodeDependencies[nodeId] > 0) {
endingNodeId = nodeId
endingNodeIds.push(nodeId)
}
})
return endingNodeId
return endingNodeIds
}
/**
@@ -213,6 +262,7 @@ export const getEndingNode = (nodeDependencies: INodeDependencies, graph: INodeD
export const buildLangchain = async (
startingNodeIds: string[],
reactFlowNodes: IReactFlowNode[],
reactFlowEdges: IReactFlowEdge[],
graph: INodeDirectedGraph,
depthQueue: IDepthQueue,
componentNodes: IComponentNodes,
@@ -231,6 +281,8 @@ export const buildLangchain = async (
// Create a Queue and add our initial node in it
const nodeQueue = [] as INodeQueue[]
const exploredNode = {} as IExploredNode
const dynamicVariables = {} as Record<string, unknown>
let ignoreNodeIds: string[] = []
// In the case of infinite loop, only max 3 loops will be executed
const maxLoop = 3
@@ -267,20 +319,59 @@ export const buildLangchain = async (
appDataSource,
databaseEntities,
logger,
cachePool
cachePool,
dynamicVariables
})
logger.debug(`[server]: Finished upserting ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
break
} else {
logger.debug(`[server]: Initializing ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
flowNodes[nodeIndex].data.instance = await newNodeInstance.init(reactFlowNodeData, question, {
let outputResult = await newNodeInstance.init(reactFlowNodeData, question, {
chatId,
chatflowid,
appDataSource,
databaseEntities,
logger,
cachePool
cachePool,
dynamicVariables
})
// Save dynamic variables
if (reactFlowNode.data.name === 'setVariable') {
const dynamicVars = outputResult?.dynamicVariables ?? {}
for (const variableKey in dynamicVars) {
dynamicVariables[variableKey] = dynamicVars[variableKey]
}
outputResult = outputResult?.output
}
// Determine which nodes to route next when it comes to ifElse
if (reactFlowNode.data.name === 'ifElseFunction' && typeof outputResult === 'object') {
let sourceHandle = ''
if (outputResult.type === true) {
sourceHandle = `${nodeId}-output-returnFalse-string|number|boolean|json|array`
} else if (outputResult.type === false) {
sourceHandle = `${nodeId}-output-returnTrue-string|number|boolean|json|array`
}
const ifElseEdge = reactFlowEdges.find((edg) => edg.source === nodeId && edg.sourceHandle === sourceHandle)
if (ifElseEdge) {
const { graph } = constructGraphs(
reactFlowNodes,
reactFlowEdges.filter((edg) => !(edg.source === nodeId && edg.sourceHandle === sourceHandle)),
{ isNonDirected: true }
)
ignoreNodeIds.push(ifElseEdge.target, ...getAllConnectedNodes(graph, ifElseEdge.target))
ignoreNodeIds = [...new Set(ignoreNodeIds)]
}
outputResult = outputResult?.output
}
flowNodes[nodeIndex].data.instance = outputResult
logger.debug(`[server]: Finished initializing ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
}
} catch (e: any) {
@@ -288,7 +379,7 @@ export const buildLangchain = async (
throw new Error(e)
}
const neighbourNodeIds = graph[nodeId]
let neighbourNodeIds = graph[nodeId]
const nextDepth = depth + 1
// Find other nodes that are on the same depth level
@@ -299,9 +390,11 @@ export const buildLangchain = async (
neighbourNodeIds.push(id)
}
neighbourNodeIds = neighbourNodeIds.filter((neigh) => !ignoreNodeIds.includes(neigh))
for (let i = 0; i < neighbourNodeIds.length; i += 1) {
const neighNodeId = neighbourNodeIds[i]
if (ignoreNodeIds.includes(neighNodeId)) continue
// If nodeId has been seen, cycle detected
if (Object.prototype.hasOwnProperty.call(exploredNode, neighNodeId)) {
const { remainingLoop, lastSeenDepth } = exploredNode[neighNodeId]
@@ -319,6 +412,12 @@ export const buildLangchain = async (
nodeQueue.push({ nodeId: neighNodeId, depth: nextDepth })
}
}
// Move end node to last
if (!neighbourNodeIds.length) {
const index = flowNodes.findIndex((nd) => nd.data.id === nodeId)
flowNodes.push(flowNodes.splice(index, 1)[0])
}
}
return flowNodes
}
@@ -711,6 +810,7 @@ export const findAvailableConfigs = (reactFlowNodes: IReactFlowNode[], component
/**
* Check to see if flow valid for stream
* TODO: perform check from component level. i.e: set streaming on component, and check here
* @param {IReactFlowNode[]} reactFlowNodes
* @param {INodeData} endingNodeData
* @returns {boolean}