Enable/disable variables in override configuration (#3467)

* Add ability to enable/disable which variables can be overriden during external predictions

* Remove duplicated code

* Remove rate limit and allowed domains tab from chatflow configuration

* Show tooltip in api code dialog for override config properties

* Fix server crash when override config is not available

* update UI for chatflow config security, file upload

* Fix UI issues in security tab of chatflow configuration dialog

* Fix override config options not updating when nodes change

* Fix crash in api code dialog when overrideConfig is not available for a chatflow/agentflow. Also fix input config in api code dialog not updating when nodes change.

* Refactor override config and add override config for variables

* Update api code dialog - update how override config is read and show variable overrides

* Update how node and variable overrides are read and resolved

* Prevent api code dialog mounting on page load and only mount when api code dialog button is clicked. this should fix loading incorrect data.

* Fix variables list not showing when overrideConfig is not available

* add overrideconfig to agentflow and upsert vector

* temporarily removed enable overrideconfig on upsert, fix linting issues

---------

Co-authored-by: Henry <hzj94@hotmail.com>
This commit is contained in:
Ilango
2024-11-13 23:51:59 +05:30
committed by GitHub
parent 3b72017a10
commit 537aa51ef8
16 changed files with 1053 additions and 289 deletions
+182 -119
View File
@@ -37,7 +37,8 @@ import {
databaseEntities,
getSessionChatHistory,
getMemorySessionId,
clearSessionMemory
clearSessionMemory,
getAPIOverrideConfig
} from '../utils'
import { getRunningExpressApp } from './getRunningExpressApp'
import { replaceInputsWithConfig, resolveVariables } from '.'
@@ -111,6 +112,9 @@ export const buildAgentGraph = async (
)
}
/*** Get API Config ***/
const { nodeOverrides, variableOverrides, apiOverrideStatus } = getAPIOverrideConfig(chatflow)
// Initialize nodes like ChatModels, Tools, etc.
const reactFlowNodes: IReactFlowNode[] = await buildFlow({
startingNodeIds,
@@ -121,17 +125,20 @@ export const buildAgentGraph = async (
depthQueue,
componentNodes: appServer.nodesPool.componentNodes,
question: incomingInput.question,
uploadedFilesContent,
chatHistory,
chatId,
sessionId,
chatflowid,
appDataSource: appServer.AppDataSource,
overrideConfig: incomingInput?.overrideConfig,
apiOverrideStatus,
nodeOverrides,
variableOverrides,
cachePool: appServer.cachePool,
isUpsert: false,
uploads: incomingInput.uploads,
baseURL,
uploadedFilesContent
baseURL
})
const options = {
@@ -177,39 +184,39 @@ export const buildAgentGraph = async (
try {
if (!seqAgentNodes.length) {
streamResults = await compileMultiAgentsGraph(
streamResults = await compileMultiAgentsGraph({
chatflow,
mapNameToLabel,
reactFlowNodes,
endingNodeIds,
appServer.nodesPool.componentNodes,
workerNodeIds: endingNodeIds,
componentNodes: appServer.nodesPool.componentNodes,
options,
startingNodeIds,
incomingInput.question,
incomingInput.history,
question: incomingInput.question,
prependHistoryMessages: incomingInput.history,
chatHistory,
incomingInput?.overrideConfig,
sessionId || chatId,
seqAgentNodes.some((node) => node.data.inputs?.summarization),
overrideConfig: incomingInput?.overrideConfig,
threadId: sessionId || chatId,
summarization: seqAgentNodes.some((node) => node.data.inputs?.summarization),
uploadedFilesContent
)
})
} else {
isSequential = true
streamResults = await compileSeqAgentsGraph(
streamResults = await compileSeqAgentsGraph({
depthQueue,
chatflow,
reactFlowNodes,
edges,
appServer.nodesPool.componentNodes,
reactFlowEdges: edges,
componentNodes: appServer.nodesPool.componentNodes,
options,
incomingInput.question,
incomingInput.history,
question: incomingInput.question,
prependHistoryMessages: incomingInput.history,
chatHistory,
incomingInput?.overrideConfig,
sessionId || chatId,
incomingInput.action,
overrideConfig: incomingInput?.overrideConfig,
threadId: sessionId || chatId,
action: incomingInput.action,
uploadedFilesContent
)
})
}
if (streamResults) {
@@ -367,7 +374,11 @@ export const buildAgentGraph = async (
// Map raw tool calls to used tools, to be shown on interrupted message
const mappedToolCalls = lastMessageRaw.tool_calls.map((toolCall) => {
return { tool: toolCall.name, toolInput: toolCall.args, toolOutput: '' }
return {
tool: toolCall.name,
toolInput: toolCall.args,
toolOutput: ''
}
})
// Emit the interrupt message to the client
@@ -388,7 +399,11 @@ export const buildAgentGraph = async (
}
finalAction = {
id: uuidv4(),
mapping: { approve: approveButtonText, reject: rejectButtonText, toolCalls: lastMessageRaw.tool_calls },
mapping: {
approve: approveButtonText,
reject: rejectButtonText,
toolCalls: lastMessageRaw.tool_calls
},
elements: [
{ type: 'approve-button', label: approveButtonText },
{ type: 'reject-button', label: rejectButtonText }
@@ -446,37 +461,42 @@ export const buildAgentGraph = async (
}
}
/**
* Compile Multi Agents Graph
* @param {IChatFlow} chatflow
* @param {Record<string, {label: string, nodeName: string }>} mapNameToLabel
* @param {IReactFlowNode[]} reactflowNodes
* @param {string[]} workerNodeIds
* @param {IComponentNodes} componentNodes
* @param {ICommonObject} options
* @param {string[]} startingNodeIds
* @param {string} question
* @param {ICommonObject} overrideConfig
* @param {string} threadId
* @param {boolean} summarization
* @param {string} uploadedFilesContent,
*/
const compileMultiAgentsGraph = async (
chatflow: IChatFlow,
mapNameToLabel: Record<string, { label: string; nodeName: string }>,
reactflowNodes: IReactFlowNode[] = [],
workerNodeIds: string[],
componentNodes: IComponentNodes,
options: ICommonObject,
startingNodeIds: string[],
question: string,
prependHistoryMessages: IMessage[] = [],
chatHistory: IMessage[] = [],
overrideConfig?: ICommonObject,
threadId?: string,
summarization?: boolean,
type MultiAgentsGraphParams = {
chatflow: IChatFlow
mapNameToLabel: Record<string, { label: string; nodeName: string }>
reactFlowNodes: IReactFlowNode[]
workerNodeIds: string[]
componentNodes: IComponentNodes
options: ICommonObject
startingNodeIds: string[]
question: string
prependHistoryMessages?: IMessage[]
chatHistory?: IMessage[]
overrideConfig?: ICommonObject
threadId?: string
summarization?: boolean
uploadedFilesContent?: string
) => {
}
const compileMultiAgentsGraph = async (params: MultiAgentsGraphParams) => {
const {
chatflow,
mapNameToLabel,
reactFlowNodes,
workerNodeIds,
componentNodes,
options,
startingNodeIds,
prependHistoryMessages = [],
chatHistory = [],
overrideConfig = {},
threadId,
summarization = false,
uploadedFilesContent
} = params
let question = params.question
const appServer = getRunningExpressApp()
const channels: ITeamState = {
messages: {
@@ -495,7 +515,10 @@ const compileMultiAgentsGraph = async (
channels
})
const workerNodes = reactflowNodes.filter((node) => workerNodeIds.includes(node.data.id))
const workerNodes = reactFlowNodes.filter((node) => workerNodeIds.includes(node.data.id))
/*** Get API Config ***/
const { nodeOverrides, variableOverrides, apiOverrideStatus } = getAPIOverrideConfig(chatflow)
let supervisorWorkers: { [key: string]: IMultiAgentNode[] } = {}
@@ -506,15 +529,16 @@ const compileMultiAgentsGraph = async (
const newNodeInstance = new nodeModule.nodeClass()
let flowNodeData = cloneDeep(workerNode.data)
if (overrideConfig) flowNodeData = replaceInputsWithConfig(flowNodeData, overrideConfig)
if (overrideConfig && apiOverrideStatus) flowNodeData = replaceInputsWithConfig(flowNodeData, overrideConfig, nodeOverrides)
flowNodeData = await resolveVariables(
appServer.AppDataSource,
flowNodeData,
reactflowNodes,
reactFlowNodes,
question,
chatHistory,
overrideConfig,
uploadedFilesContent
uploadedFilesContent,
variableOverrides
)
try {
@@ -536,7 +560,7 @@ const compileMultiAgentsGraph = async (
// Init supervisor nodes
for (const supervisor in supervisorWorkers) {
const supervisorInputLabel = mapNameToLabel[supervisor].label
const supervisorNode = reactflowNodes.find((node) => supervisorInputLabel === node.data.inputs?.supervisorName)
const supervisorNode = reactFlowNodes.find((node) => supervisorInputLabel === node.data.inputs?.supervisorName)
if (!supervisorNode) continue
const nodeInstanceFilePath = componentNodes[supervisorNode.data.name].filePath as string
@@ -545,15 +569,16 @@ const compileMultiAgentsGraph = async (
let flowNodeData = cloneDeep(supervisorNode.data)
if (overrideConfig) flowNodeData = replaceInputsWithConfig(flowNodeData, overrideConfig)
if (overrideConfig && apiOverrideStatus) flowNodeData = replaceInputsWithConfig(flowNodeData, overrideConfig, nodeOverrides)
flowNodeData = await resolveVariables(
appServer.AppDataSource,
flowNodeData,
reactflowNodes,
reactFlowNodes,
question,
chatHistory,
overrideConfig,
uploadedFilesContent
uploadedFilesContent,
variableOverrides
)
if (flowNodeData.inputs) flowNodeData.inputs.workerNodes = supervisorWorkers[supervisor]
@@ -598,7 +623,7 @@ const compileMultiAgentsGraph = async (
appServer.chatflowPool.add(
`${chatflow.id}_${options.chatId}`,
workflowGraph as any,
reactflowNodes.filter((node) => startingNodeIds.includes(node.id)),
reactFlowNodes.filter((node) => startingNodeIds.includes(node.id)),
overrideConfig
)
@@ -616,9 +641,17 @@ const compileMultiAgentsGraph = async (
if (prependHistoryMessages.length === chatHistory.length) {
for (const message of prependHistoryMessages) {
if (message.role === 'apiMessage' || message.type === 'apiMessage') {
prependMessages.push(new AIMessage({ content: message.message || message.content || '' }))
prependMessages.push(
new AIMessage({
content: message.message || message.content || ''
})
)
} else if (message.role === 'userMessage' || message.type === 'userMessage') {
prependMessages.push(new HumanMessage({ content: message.message || message.content || '' }))
prependMessages.push(
new HumanMessage({
content: message.message || message.content || ''
})
)
}
}
}
@@ -629,7 +662,11 @@ const compileMultiAgentsGraph = async (
{
messages: [...prependMessages, new HumanMessage({ content: finalQuestion })]
},
{ recursionLimit: supervisorResult?.recursionLimit ?? 100, callbacks: [loggerHandler, ...callbacks], configurable: config }
{
recursionLimit: supervisorResult?.recursionLimit ?? 100,
callbacks: [loggerHandler, ...callbacks],
configurable: config
}
)
} catch (e) {
throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, `Error initialize supervisor nodes - ${getErrorMessage(e)}`)
@@ -637,35 +674,40 @@ const compileMultiAgentsGraph = async (
}
}
/**
* Compile Seq Agents Graph
* @param {IDepthQueue} depthQueue
* @param {IChatFlow} chatflow
* @param {IReactFlowNode[]} reactflowNodes
* @param {IReactFlowEdge[]} reactflowEdges
* @param {IComponentNodes} componentNodes
* @param {ICommonObject} options
* @param {string} question
* @param {IMessage[]} chatHistory
* @param {ICommonObject} overrideConfig
* @param {string} threadId
* @param {IAction} action
*/
const compileSeqAgentsGraph = async (
depthQueue: IDepthQueue,
chatflow: IChatFlow,
reactflowNodes: IReactFlowNode[] = [],
reactflowEdges: IReactFlowEdge[] = [],
componentNodes: IComponentNodes,
options: ICommonObject,
question: string,
prependHistoryMessages: IMessage[] = [],
chatHistory: IMessage[] = [],
overrideConfig?: ICommonObject,
threadId?: string,
action?: IAction,
type SeqAgentsGraphParams = {
depthQueue: IDepthQueue
chatflow: IChatFlow
reactFlowNodes: IReactFlowNode[]
reactFlowEdges: IReactFlowEdge[]
componentNodes: IComponentNodes
options: ICommonObject
question: string
prependHistoryMessages?: IMessage[]
chatHistory?: IMessage[]
overrideConfig?: ICommonObject
threadId?: string
action?: IAction
uploadedFilesContent?: string
) => {
}
const compileSeqAgentsGraph = async (params: SeqAgentsGraphParams) => {
const {
depthQueue,
chatflow,
reactFlowNodes,
reactFlowEdges,
componentNodes,
options,
prependHistoryMessages = [],
chatHistory = [],
overrideConfig = {},
threadId,
action,
uploadedFilesContent
} = params
let question = params.question
const appServer = getRunningExpressApp()
let channels: ISeqAgentsState = {
@@ -676,7 +718,7 @@ const compileSeqAgentsGraph = async (
}
// Get state
const seqStateNode = reactflowNodes.find((node: IReactFlowNode) => node.data.name === 'seqState')
const seqStateNode = reactFlowNodes.find((node: IReactFlowNode) => node.data.name === 'seqState')
if (seqStateNode) {
channels = {
...seqStateNode.data.instance.node,
@@ -690,13 +732,13 @@ const compileSeqAgentsGraph = async (
})
/*** Validate Graph ***/
const startAgentNodes: IReactFlowNode[] = reactflowNodes.filter((node: IReactFlowNode) => node.data.name === 'seqStart')
const startAgentNodes: IReactFlowNode[] = reactFlowNodes.filter((node: IReactFlowNode) => node.data.name === 'seqStart')
if (!startAgentNodes.length) throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, 'Start node not found')
if (startAgentNodes.length > 1)
throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, 'Graph should have only one start node')
const endAgentNodes: IReactFlowNode[] = reactflowNodes.filter((node: IReactFlowNode) => node.data.name === 'seqEnd')
const loopNodes: IReactFlowNode[] = reactflowNodes.filter((node: IReactFlowNode) => node.data.name === 'seqLoop')
const endAgentNodes: IReactFlowNode[] = reactFlowNodes.filter((node: IReactFlowNode) => node.data.name === 'seqEnd')
const loopNodes: IReactFlowNode[] = reactFlowNodes.filter((node: IReactFlowNode) => node.data.name === 'seqLoop')
if (!endAgentNodes.length && !loopNodes.length) {
throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, 'Graph should have at least one End/Loop node')
}
@@ -708,6 +750,7 @@ const compileSeqAgentsGraph = async (
let conditionalToolNodes: Record<string, { source: ISeqAgentNode; toolNodes: ISeqAgentNode[] }> = {}
let bindModel: Record<string, any> = {}
let interruptToolNodeNames = []
const { nodeOverrides, variableOverrides, apiOverrideStatus } = getAPIOverrideConfig(chatflow)
const initiateNode = async (node: IReactFlowNode) => {
const nodeInstanceFilePath = componentNodes[node.data.name].filePath as string
@@ -715,15 +758,16 @@ const compileSeqAgentsGraph = async (
const newNodeInstance = new nodeModule.nodeClass()
flowNodeData = cloneDeep(node.data)
if (overrideConfig) flowNodeData = replaceInputsWithConfig(flowNodeData, overrideConfig)
if (overrideConfig && apiOverrideStatus) flowNodeData = replaceInputsWithConfig(flowNodeData, overrideConfig, nodeOverrides)
flowNodeData = await resolveVariables(
appServer.AppDataSource,
flowNodeData,
reactflowNodes,
reactFlowNodes,
question,
chatHistory,
overrideConfig,
uploadedFilesContent
uploadedFilesContent,
variableOverrides
)
const seqAgentNode: ISeqAgentNode = await newNodeInstance.init(flowNodeData, question, options)
@@ -740,16 +784,16 @@ const compileSeqAgentsGraph = async (
* 2.) With the interruptedRouteMapping object, avoid adding conditional edges to the Interrupted Agent for the nodes that are already interrupted by tools. It will be separately added from the function - agentInterruptToolFunc
*/
const processInterruptedRouteMapping = (conditionNodeId: string) => {
const conditionEdges = reactflowEdges.filter((edge) => edge.source === conditionNodeId) ?? []
const conditionEdges = reactFlowEdges.filter((edge) => edge.source === conditionNodeId) ?? []
for (const conditionEdge of conditionEdges) {
const nextNodeId = conditionEdge.target
const conditionNodeOutputAnchorId = conditionEdge.sourceHandle
const nextNode = reactflowNodes.find((node) => node.id === nextNodeId)
const nextNode = reactFlowNodes.find((node) => node.id === nextNodeId)
if (!nextNode) continue
const conditionNode = reactflowNodes.find((node) => node.id === conditionNodeId)
const conditionNode = reactFlowNodes.find((node) => node.id === conditionNodeId)
if (!conditionNode) continue
const outputAnchors = conditionNode?.data.outputAnchors
@@ -780,13 +824,13 @@ const compileSeqAgentsGraph = async (
* }
*/
const prepareConditionalEdges = (nodeId: string, nodeInstance: ISeqAgentNode) => {
const conditionEdges = reactflowEdges.filter((edge) => edge.target === nodeId && edge.source.includes('seqCondition')) ?? []
const conditionEdges = reactFlowEdges.filter((edge) => edge.target === nodeId && edge.source.includes('seqCondition')) ?? []
for (const conditionEdge of conditionEdges) {
const conditionNodeId = conditionEdge.source
const conditionNodeOutputAnchorId = conditionEdge.sourceHandle
const conditionNode = reactflowNodes.find((node) => node.id === conditionNodeId)
const conditionNode = reactFlowNodes.find((node) => node.id === conditionNodeId)
const outputAnchors = conditionNode?.data.outputAnchors
if (!outputAnchors || !outputAnchors.length || !outputAnchors[0].options) continue
@@ -799,7 +843,10 @@ const compileSeqAgentsGraph = async (
if (Object.prototype.hasOwnProperty.call(conditionalEdges, conditionNodeId)) {
conditionalEdges[conditionNodeId] = {
...conditionalEdges[conditionNodeId],
nodes: { ...conditionalEdges[conditionNodeId].nodes, [conditionOutputAnchorLabel]: nodeInstance.name }
nodes: {
...conditionalEdges[conditionNodeId].nodes,
[conditionOutputAnchorLabel]: nodeInstance.name
}
}
} else {
conditionalEdges[conditionNodeId] = {
@@ -820,7 +867,10 @@ const compileSeqAgentsGraph = async (
if (Object.prototype.hasOwnProperty.call(conditionalToolNodes, predecessorAgent.id)) {
const toolNodes = conditionalToolNodes[predecessorAgent.id].toolNodes
toolNodes.push(toolNodeInstance)
conditionalToolNodes[predecessorAgent.id] = { source: predecessorAgent, toolNodes }
conditionalToolNodes[predecessorAgent.id] = {
source: predecessorAgent,
toolNodes
}
} else {
conditionalToolNodes[predecessorAgent.id] = {
source: predecessorAgent,
@@ -837,7 +887,7 @@ const compileSeqAgentsGraph = async (
/*** Start processing every Agent nodes ***/
for (const agentNodeId of getSortedDepthNodes(depthQueue)) {
const agentNode = reactflowNodes.find((node) => node.id === agentNodeId)
const agentNode = reactFlowNodes.find((node) => node.id === agentNodeId)
if (!agentNode) continue
const eligibleSeqNodes = ['seqAgent', 'seqEnd', 'seqLoop', 'seqToolNode', 'seqLLMNode']
@@ -859,8 +909,8 @@ const compileSeqAgentsGraph = async (
if (agentInstance.type === 'agent' && agentNode.data.inputs?.interrupt) {
interruptToolNodeNames.push(agentInstance.agentInterruptToolNode.name)
const nextNodeId = reactflowEdges.find((edge) => edge.source === agentNode.id)?.target
const nextNode = reactflowNodes.find((node) => node.id === nextNodeId)
const nextNodeId = reactFlowEdges.find((edge) => edge.source === agentNode.id)?.target
const nextNode = reactFlowNodes.find((node) => node.id === nextNodeId)
let nextNodeSeqAgentName = ''
if (nextNodeId && nextNode) {
@@ -950,11 +1000,11 @@ const compileSeqAgentsGraph = async (
/*** Add conditional edges to graph for condition nodes ***/
for (const conditionNodeId in conditionalEdges) {
const startConditionEdges = reactflowEdges.filter((edge) => edge.target === conditionNodeId)
const startConditionEdges = reactFlowEdges.filter((edge) => edge.target === conditionNodeId)
if (!startConditionEdges.length) continue
for (const startConditionEdge of startConditionEdges) {
const startConditionNode = reactflowNodes.find((node) => node.id === startConditionEdge.source)
const startConditionNode = reactFlowNodes.find((node) => node.id === startConditionEdge.source)
if (!startConditionNode) continue
seqGraph.addConditionalEdges(
startConditionNode.data.instance.name,
@@ -995,22 +1045,24 @@ const compileSeqAgentsGraph = async (
routeMessage
)
}
/*** Add agentflow to pool ***/
;(seqGraph as any).signal = options.signal
appServer.chatflowPool.add(
`${chatflow.id}_${options.chatId}`,
seqGraph as any,
reactflowNodes.filter((node) => startAgentNodes.map((nd) => nd.id).includes(node.id)),
reactFlowNodes.filter((node) => startAgentNodes.map((nd) => nd.id).includes(node.id)),
overrideConfig
)
/*** Get memory ***/
const startNode = reactflowNodes.find((node: IReactFlowNode) => node.data.name === 'seqStart')
const startNode = reactFlowNodes.find((node: IReactFlowNode) => node.data.name === 'seqStart')
let memory = startNode?.data.instance?.checkpointMemory
try {
const graph = seqGraph.compile({ checkpointer: memory, interruptBefore: interruptToolNodeNames as any })
const graph = seqGraph.compile({
checkpointer: memory,
interruptBefore: interruptToolNodeNames as any
})
const loggerHandler = new ConsoleCallbackHandler(logger)
const callbacks = await additionalCallbacks(flowNodeData as any, options)
@@ -1021,9 +1073,17 @@ const compileSeqAgentsGraph = async (
if (prependHistoryMessages.length === chatHistory.length) {
for (const message of prependHistoryMessages) {
if (message.role === 'apiMessage' || message.type === 'apiMessage') {
prependMessages.push(new AIMessage({ content: message.message || message.content || '' }))
prependMessages.push(
new AIMessage({
content: message.message || message.content || ''
})
)
} else if (message.role === 'userMessage' || message.type === 'userMessage') {
prependMessages.push(new HumanMessage({ content: message.message || message.content || '' }))
prependMessages.push(
new HumanMessage({
content: message.message || message.content || ''
})
)
}
}
}
@@ -1047,7 +1107,10 @@ const compileSeqAgentsGraph = async (
})
}
}
return await graph.stream(humanMsg, { callbacks: [loggerHandler, ...callbacks], configurable: config })
return await graph.stream(humanMsg, {
callbacks: [loggerHandler, ...callbacks],
configurable: config
})
} catch (e) {
logger.error('Error compile graph', e)
throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, `Error compile graph - ${getErrorMessage(e)}`)