Feature/seq agents (#2798)

* update build functions

* sequential agents

* update langchain to 0.2, added sequential agent nodes

* add marketplace templates

* update howto wordings

* Merge branch 'main' into feature/Seq-Agents

# Conflicts:
#	pnpm-lock.yaml

* update deprecated functions and add new sequential nodes

* add marketplace templates

* update marketplace templates, add structured output to llm node

* add multi agents template

* update llm node with bindmodels

* update cypress version

* update templates sticky note wordings

* update tool node to include human in loop action

* update structured outputs error from models

* update cohere package to resolve google genai pipeThrough bug

* update mistral package version, added message reconstruction before invoke seq agent

* add HITL to agent

* update state messages restructuring

* update load and split methods for s3 directory
This commit is contained in:
Henry Heng
2024-07-22 17:46:14 +01:00
committed by GitHub
parent 34d0e4302c
commit bca4de0c63
152 changed files with 55307 additions and 35236 deletions
+3
View File
@@ -1,3 +1,4 @@
import { IAction } from 'flowise-components'
import { ICommonObject, IFileUpload, INode, INodeData as INodeDataFromComponent, INodeParams } from 'flowise-components'
export type MessageType = 'apiMessage' | 'userMessage'
@@ -48,6 +49,7 @@ export interface IChatMessage {
sessionId?: string
createdDate: Date
leadEmail?: string
action?: string | null
}
export interface IChatMessageFeedback {
@@ -218,6 +220,7 @@ export interface IncomingInput {
uploads?: IFileUpload[]
leadEmail?: string
history?: IMessage[]
action?: IAction
}
export interface IActiveChatflows {
-3
View File
@@ -136,9 +136,6 @@ export default class Start extends Command {
// Telemetry
if (flags.DISABLE_FLOWISE_TELEMETRY) process.env.DISABLE_FLOWISE_TELEMETRY = flags.DISABLE_FLOWISE_TELEMETRY
// Disable langchain warnings
process.env.LANGCHAIN_SUPPRESS_MIGRATION_WARNINGS = 'true'
// Model list config
if (flags.MODEL_LIST_CONFIG_JSON) process.env.MODEL_LIST_CONFIG_JSON = flags.MODEL_LIST_CONFIG_JSON
@@ -32,6 +32,9 @@ export class ChatMessage implements IChatMessage {
@Column({ nullable: true, type: 'text' })
fileUploads?: string
@Column({ nullable: true, type: 'text' })
action?: string | null
@Column()
chatType: string
@@ -0,0 +1,12 @@
import { MigrationInterface, QueryRunner } from 'typeorm'
export class AddActionToChatMessage1721078251523 implements MigrationInterface {
public async up(queryRunner: QueryRunner): Promise<void> {
const columnExists = await queryRunner.hasColumn('chat_message', 'action')
if (!columnExists) queryRunner.query(`ALTER TABLE \`chat_message\` ADD COLUMN \`action\` LONGTEXT;`)
}
public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`ALTER TABLE \`chat_message\` DROP COLUMN \`action\`;`)
}
}
@@ -20,6 +20,7 @@ import { AddLead1710832127079 } from './1710832127079-AddLead'
import { AddLeadToChatMessage1711538023578 } from './1711538023578-AddLeadToChatMessage'
import { AddAgentReasoningToChatMessage1714679514451 } from './1714679514451-AddAgentReasoningToChatMessage'
import { AddTypeToChatFlow1766759476232 } from './1766759476232-AddTypeToChatFlow'
import { AddActionToChatMessage1721078251523 } from './1721078251523-AddActionToChatMessage'
export const mariadbMigrations = [
Init1693840429259,
@@ -43,5 +44,6 @@ export const mariadbMigrations = [
AddLead1710832127079,
AddLeadToChatMessage1711538023578,
AddAgentReasoningToChatMessage1714679514451,
AddTypeToChatFlow1766759476232
AddTypeToChatFlow1766759476232,
AddActionToChatMessage1721078251523
]
@@ -0,0 +1,12 @@
import { MigrationInterface, QueryRunner } from 'typeorm'
export class AddActionToChatMessage1721078251523 implements MigrationInterface {
public async up(queryRunner: QueryRunner): Promise<void> {
const columnExists = await queryRunner.hasColumn('chat_message', 'action')
if (!columnExists) queryRunner.query(`ALTER TABLE \`chat_message\` ADD COLUMN \`action\` LONGTEXT;`)
}
public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`ALTER TABLE \`chat_message\` DROP COLUMN \`action\`;`)
}
}
@@ -20,6 +20,7 @@ import { AddLead1710832127079 } from './1710832127079-AddLead'
import { AddLeadToChatMessage1711538023578 } from './1711538023578-AddLeadToChatMessage'
import { AddAgentReasoningToChatMessage1714679514451 } from './1714679514451-AddAgentReasoningToChatMessage'
import { AddTypeToChatFlow1766759476232 } from './1766759476232-AddTypeToChatFlow'
import { AddActionToChatMessage1721078251523 } from './1721078251523-AddActionToChatMessage'
export const mysqlMigrations = [
Init1693840429259,
@@ -43,5 +44,6 @@ export const mysqlMigrations = [
AddLead1710832127079,
AddLeadToChatMessage1711538023578,
AddAgentReasoningToChatMessage1714679514451,
AddTypeToChatFlow1766759476232
AddTypeToChatFlow1766759476232,
AddActionToChatMessage1721078251523
]
@@ -0,0 +1,11 @@
import { MigrationInterface, QueryRunner } from 'typeorm'
export class AddActionToChatMessage1721078251523 implements MigrationInterface {
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`ALTER TABLE "chat_message" ADD COLUMN IF NOT EXISTS "action" TEXT;`)
}
public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`ALTER TABLE "chat_message" DROP COLUMN "action";`)
}
}
@@ -21,6 +21,7 @@ import { AddLead1710832137905 } from './1710832137905-AddLead'
import { AddLeadToChatMessage1711538016098 } from './1711538016098-AddLeadToChatMessage'
import { AddAgentReasoningToChatMessage1714679514451 } from './1714679514451-AddAgentReasoningToChatMessage'
import { AddTypeToChatFlow1766759476232 } from './1766759476232-AddTypeToChatFlow'
import { AddActionToChatMessage1721078251523 } from './1721078251523-AddActionToChatMessage'
export const postgresMigrations = [
Init1693891895163,
@@ -45,5 +46,6 @@ export const postgresMigrations = [
AddLead1710832137905,
AddLeadToChatMessage1711538016098,
AddAgentReasoningToChatMessage1714679514451,
AddTypeToChatFlow1766759476232
AddTypeToChatFlow1766759476232,
AddActionToChatMessage1721078251523
]
@@ -0,0 +1,11 @@
import { MigrationInterface, QueryRunner } from 'typeorm'
export class AddActionToChatMessage1721078251523 implements MigrationInterface {
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`ALTER TABLE "chat_message" ADD COLUMN "action" TEXT;`)
}
public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`ALTER TABLE "chat_message" DROP COLUMN "action";`)
}
}
@@ -20,6 +20,7 @@ import { AddLead1710832117612 } from './1710832117612-AddLead'
import { AddLeadToChatMessage1711537986113 } from './1711537986113-AddLeadToChatMessage'
import { AddAgentReasoningToChatMessage1714679514451 } from './1714679514451-AddAgentReasoningToChatMessage'
import { AddTypeToChatFlow1766759476232 } from './1766759476232-AddTypeToChatFlow'
import { AddActionToChatMessage1721078251523 } from './1721078251523-AddActionToChatMessage'
export const sqliteMigrations = [
Init1693835579790,
@@ -43,5 +44,6 @@ export const sqliteMigrations = [
AddLead1710832117612,
AddLeadToChatMessage1711537986113,
AddAgentReasoningToChatMessage1714679514451,
AddTypeToChatFlow1766759476232
AddTypeToChatFlow1766759476232,
AddActionToChatMessage1721078251523
]
@@ -46,8 +46,8 @@ const checkIfChatflowIsValidForStreaming = async (chatflowId: string): Promise<a
isStreaming = isFlowValidForStream(nodes, endingNodeData)
}
// If it is a Multi Agents, always enable streaming
if (endingNodes.filter((node) => node.data.category === 'Multi Agents').length > 0) {
// If it is a Multi/Sequential Agents, always enable streaming
if (endingNodes.filter((node) => node.data.category === 'Multi Agents' || node.data.category === 'Sequential Agents').length > 0) {
return { isStreaming: true }
}
+737 -93
View File
@@ -2,22 +2,48 @@ import {
ICommonObject,
IMultiAgentNode,
IAgentReasoning,
IAction,
ITeamState,
ConsoleCallbackHandler,
additionalCallbacks
additionalCallbacks,
ISeqAgentsState,
ISeqAgentNode,
IUsedTool,
IDocument
} from 'flowise-components'
import { IChatFlow, IComponentNodes, IDepthQueue, IReactFlowNode, IReactFlowObject } from '../Interface'
import { Server } from 'socket.io'
import { buildFlow, getStartingNodes, getEndingNodes, constructGraphs, databaseEntities } from '../utils'
import { getRunningExpressApp } from './getRunningExpressApp'
import logger from './logger'
import { StateGraph, END } from '@langchain/langgraph'
import { BaseMessage, HumanMessage } from '@langchain/core/messages'
import { cloneDeep, flatten } from 'lodash'
import { replaceInputsWithConfig, resolveVariables } from '.'
import { omit, cloneDeep, flatten, uniq } from 'lodash'
import { StateGraph, END, START } from '@langchain/langgraph'
import { Document } from '@langchain/core/documents'
import { StatusCodes } from 'http-status-codes'
import { v4 as uuidv4 } from 'uuid'
import { StructuredTool } from '@langchain/core/tools'
import { BaseMessage, HumanMessage, AIMessage, AIMessageChunk, ToolMessage } from '@langchain/core/messages'
import {
IChatFlow,
IComponentNodes,
IDepthQueue,
IReactFlowNode,
IReactFlowObject,
IReactFlowEdge,
IMessage,
IncomingInput
} from '../Interface'
import {
buildFlow,
getStartingNodes,
getEndingNodes,
constructGraphs,
databaseEntities,
getSessionChatHistory,
getMemorySessionId,
clearSessionMemory
} from '../utils'
import { getRunningExpressApp } from './getRunningExpressApp'
import { replaceInputsWithConfig, resolveVariables } from '.'
import { InternalFlowiseError } from '../errors/internalFlowiseError'
import { getErrorMessage } from '../errors/utils'
import logger from './logger'
/**
* Build Agent Graph
@@ -25,6 +51,7 @@ import { getErrorMessage } from '../errors/utils'
* @param {string} chatId
* @param {string} sessionId
* @param {ICommonObject} incomingInput
* @param {boolean} isInternal
* @param {string} baseURL
* @param {Server} socketIO
*/
@@ -32,7 +59,8 @@ export const buildAgentGraph = async (
chatflow: IChatFlow,
chatId: string,
sessionId: string,
incomingInput: ICommonObject,
incomingInput: IncomingInput,
isInternal: boolean,
baseURL?: string,
socketIO?: Server
): Promise<any> => {
@@ -65,27 +93,42 @@ export const buildAgentGraph = async (
}
startingNodeIds = [...new Set(startingNodeIds)]
/*** Get Memory Node for Chat History ***/
let chatHistory: IMessage[] = []
const memoryNode = nodes.find((node) => node.data.name === 'agentMemory')
if (memoryNode) {
chatHistory = await getSessionChatHistory(
chatflowid,
getMemorySessionId(memoryNode, incomingInput, chatId, isInternal),
memoryNode,
appServer.nodesPool.componentNodes,
appServer.AppDataSource,
databaseEntities,
logger,
incomingInput.history
)
}
// Initialize nodes like ChatModels, Tools, etc.
const reactFlowNodes = await buildFlow(
const reactFlowNodes: IReactFlowNode[] = await buildFlow({
startingNodeIds,
nodes,
edges,
reactFlowNodes: nodes,
reactFlowEdges: edges,
graph,
depthQueue,
appServer.nodesPool.componentNodes,
incomingInput.question,
[],
componentNodes: appServer.nodesPool.componentNodes,
question: incomingInput.question,
chatHistory,
chatId,
sessionId,
chatflowid,
appServer.AppDataSource,
incomingInput?.overrideConfig,
appServer.cachePool,
false,
undefined,
incomingInput.uploads,
appDataSource: appServer.AppDataSource,
overrideConfig: incomingInput?.overrideConfig,
cachePool: appServer.cachePool,
isUpsert: false,
uploads: incomingInput.uploads,
baseURL
)
})
const options = {
chatId,
@@ -103,92 +146,265 @@ export const buildAgentGraph = async (
let streamResults
let finalResult = ''
let finalSummarization = ''
let agentReasoning: IAgentReasoning[] = []
let isSequential = false
let lastMessageRaw = {} as AIMessageChunk
let finalAction: IAction = {}
let totalSourceDocuments: IDocument[] = []
let totalUsedTools: IUsedTool[] = []
const workerNodes: IReactFlowNode[] = reactFlowNodes.filter((node: IReactFlowNode) => node.data.name === 'worker')
const supervisorNodes: IReactFlowNode[] = reactFlowNodes.filter((node: IReactFlowNode) => node.data.name === 'supervisor')
const workerNodes = reactFlowNodes.filter((node) => node.data.name === 'worker')
const supervisorNodes = reactFlowNodes.filter((node) => node.data.name === 'supervisor')
const seqAgentNodes = reactFlowNodes.filter((node) => node.data.category === 'Sequential Agents')
const mapNameToLabel: Record<string, string> = {}
const mapNameToLabel: Record<string, { label: string; nodeName: string }> = {}
for (const node of [...workerNodes, ...supervisorNodes]) {
mapNameToLabel[node.data.instance.name] = node.data.instance.label
for (const node of [...workerNodes, ...supervisorNodes, ...seqAgentNodes]) {
if (!Object.prototype.hasOwnProperty.call(mapNameToLabel, node.data.instance.name)) {
mapNameToLabel[node.data.instance.name] = {
label: node.data.instance.label,
nodeName: node.data.name
}
}
}
try {
streamResults = await compileGraph(
chatflow,
mapNameToLabel,
reactFlowNodes,
endingNodeIds,
appServer.nodesPool.componentNodes,
options,
startingNodeIds,
incomingInput.question,
incomingInput?.overrideConfig
)
if (!seqAgentNodes.length) {
streamResults = await compileMultiAgentsGraph(
chatflow,
mapNameToLabel,
reactFlowNodes,
endingNodeIds,
appServer.nodesPool.componentNodes,
options,
startingNodeIds,
incomingInput.question,
chatHistory,
incomingInput?.overrideConfig,
sessionId || chatId
)
} else {
isSequential = true
streamResults = await compileSeqAgentsGraph(
depthQueue,
chatflow,
reactFlowNodes,
edges,
appServer.nodesPool.componentNodes,
options,
incomingInput.question,
chatHistory,
incomingInput?.overrideConfig,
sessionId || chatId,
incomingInput.action
)
}
if (streamResults) {
let isStreamingStarted = false
for await (const output of await streamResults) {
if (!output?.__end__) {
const agentName = Object.keys(output)[0]
const usedTools = output[agentName]?.messages
? output[agentName].messages.map((msg: any) => msg.additional_kwargs?.usedTools)
: []
const sourceDocuments = output[agentName]?.messages
? output[agentName].messages.map((msg: any) => msg.additional_kwargs?.sourceDocuments)
: []
const messages = output[agentName]?.messages ? output[agentName].messages.map((msg: any) => msg.content) : []
const reasoning = {
agentName: mapNameToLabel[agentName],
messages,
next: output[agentName]?.next,
instructions: output[agentName]?.instructions,
usedTools: flatten(usedTools),
sourceDocuments: flatten(sourceDocuments)
}
agentReasoning.push(reasoning)
if (socketIO && incomingInput.socketIOClientId) {
if (!isStreamingStarted) {
isStreamingStarted = true
socketIO.to(incomingInput.socketIOClientId).emit('start', JSON.stringify(agentReasoning))
for (const agentName of Object.keys(output)) {
if (!mapNameToLabel[agentName]) continue
const nodeId = output[agentName]?.messages
? output[agentName].messages[output[agentName].messages.length - 1]?.additional_kwargs?.nodeId
: ''
const usedTools = output[agentName]?.messages
? output[agentName].messages.map((msg: BaseMessage) => msg.additional_kwargs?.usedTools)
: []
const sourceDocuments = output[agentName]?.messages
? output[agentName].messages.map((msg: BaseMessage) => msg.additional_kwargs?.sourceDocuments)
: []
const messages = output[agentName]?.messages
? output[agentName].messages.map((msg: BaseMessage) => (typeof msg === 'string' ? msg : msg.content))
: []
lastMessageRaw = output[agentName]?.messages
? output[agentName].messages[output[agentName].messages.length - 1]
: {}
const state = omit(output[agentName], ['messages'])
if (usedTools && usedTools.length) {
const cleanedTools = usedTools.filter((tool: IUsedTool) => tool)
if (cleanedTools.length) totalUsedTools.push(...cleanedTools)
}
socketIO.to(incomingInput.socketIOClientId).emit('agentReasoning', JSON.stringify(agentReasoning))
if (sourceDocuments && sourceDocuments.length) {
const cleanedDocs = sourceDocuments.filter((documents: IDocument) => documents)
if (cleanedDocs.length) totalSourceDocuments.push(...cleanedDocs)
}
// Send loading next agent indicator
if (reasoning.next && reasoning.next !== 'FINISH' && reasoning.next !== 'END') {
socketIO
.to(incomingInput.socketIOClientId)
.emit('nextAgent', mapNameToLabel[reasoning.next] || reasoning.next)
/*
* Check if the next node is a condition node, if yes, then add the agent reasoning of the condition node
*/
if (isSequential) {
const inputEdges = edges.filter(
(edg) => edg.target === nodeId && edg.targetHandle.includes(`${nodeId}-input-sequentialNode`)
)
inputEdges.forEach((edge) => {
const parentNode = reactFlowNodes.find((nd) => nd.id === edge.source)
if (parentNode) {
if (parentNode.data.name.includes('seqCondition')) {
const newMessages = messages.slice(0, -1)
newMessages.push(mapNameToLabel[agentName].label)
const reasoning = {
agentName: parentNode.data.instance?.label || parentNode.data.type,
messages: newMessages,
nodeName: parentNode.data.name,
nodeId: parentNode.data.id
}
agentReasoning.push(reasoning)
}
}
})
}
const reasoning = {
agentName: mapNameToLabel[agentName].label,
messages,
next: output[agentName]?.next,
instructions: output[agentName]?.instructions,
usedTools: flatten(usedTools) as IUsedTool[],
sourceDocuments: flatten(sourceDocuments) as Document[],
state,
nodeName: isSequential ? mapNameToLabel[agentName].nodeName : undefined,
nodeId
}
agentReasoning.push(reasoning)
finalSummarization = output[agentName]?.summarization ?? ''
if (socketIO && incomingInput.socketIOClientId) {
if (!isStreamingStarted) {
isStreamingStarted = true
socketIO.to(incomingInput.socketIOClientId).emit('start', agentReasoning)
}
socketIO.to(incomingInput.socketIOClientId).emit('agentReasoning', agentReasoning)
// Send loading next agent indicator
if (reasoning.next && reasoning.next !== 'FINISH' && reasoning.next !== 'END') {
socketIO
.to(incomingInput.socketIOClientId)
.emit('nextAgent', mapNameToLabel[reasoning.next].label || reasoning.next)
}
}
}
} else {
finalResult = output.__end__.messages.length ? output.__end__.messages.pop()?.content : ''
if (Array.isArray(finalResult)) finalResult = output.__end__.instructions
if (finalResult === incomingInput.question) {
const supervisorNode = reactFlowNodes.find((node: IReactFlowNode) => node.data.name === 'supervisor')
const llm = supervisorNode?.data?.instance?.llm
if (llm) {
const res = await llm.invoke(incomingInput.question)
finalResult = res?.content
}
}
if (socketIO && incomingInput.socketIOClientId) {
socketIO.to(incomingInput.socketIOClientId).emit('token', finalResult)
}
}
}
return { finalResult, agentReasoning }
/*
* For multi agents mode, sometimes finalResult is empty
* Provide summary as final result
*/
if (!isSequential && !finalResult && finalSummarization) {
finalResult = finalSummarization
if (socketIO && incomingInput.socketIOClientId) {
socketIO.to(incomingInput.socketIOClientId).emit('token', finalResult)
}
}
/*
* For sequential mode, sometimes finalResult is empty
* Use last agent message as final result
*/
if (isSequential && !finalResult && agentReasoning.length) {
const lastMessages = agentReasoning[agentReasoning.length - 1].messages
const lastAgentReasoningMessage = lastMessages[lastMessages.length - 1]
// If last message is an AI Message with tool calls, that means the last node was interrupted
if (lastMessageRaw.tool_calls && lastMessageRaw.tool_calls.length > 0) {
// The last node that got interrupted
const node = reactFlowNodes.find((node) => node.id === lastMessageRaw.additional_kwargs.nodeId)
// Find the next tool node that is connected to the interrupted node, to get the approve/reject button text
const tooNodeId = edges.find(
(edge) =>
edge.target.includes('seqToolNode') &&
edge.source === (lastMessageRaw.additional_kwargs && lastMessageRaw.additional_kwargs.nodeId)
)?.target
const connectedToolNode = reactFlowNodes.find((node) => node.id === tooNodeId)
// Map raw tool calls to used tools, to be shown on interrupted message
const mappedToolCalls = lastMessageRaw.tool_calls.map((toolCall) => {
return { tool: toolCall.name, toolInput: toolCall.args, toolOutput: '' }
})
// Emit the interrupt message to the client
let approveButtonText = 'Yes'
let rejectButtonText = 'No'
if (connectedToolNode || node) {
if (connectedToolNode) {
const result = await connectedToolNode.data.instance.node.seekPermissionMessage(mappedToolCalls)
finalResult = result || 'Do you want to proceed?'
approveButtonText = connectedToolNode.data.inputs?.approveButtonText || 'Yes'
rejectButtonText = connectedToolNode.data.inputs?.rejectButtonText || 'No'
} else if (node) {
const result = await node.data.instance.agentInterruptToolNode.seekPermissionMessage(mappedToolCalls)
finalResult = result || 'Do you want to proceed?'
approveButtonText = node.data.inputs?.approveButtonText || 'Yes'
rejectButtonText = node.data.inputs?.rejectButtonText || 'No'
}
finalAction = {
id: uuidv4(),
mapping: { approve: approveButtonText, reject: rejectButtonText, toolCalls: lastMessageRaw.tool_calls },
elements: [
{ type: 'approve-button', label: approveButtonText },
{ type: 'reject-button', label: rejectButtonText }
]
}
if (socketIO && incomingInput.socketIOClientId) {
socketIO.to(incomingInput.socketIOClientId).emit('token', finalResult)
socketIO.to(incomingInput.socketIOClientId).emit('action', finalAction)
}
}
totalUsedTools.push(...mappedToolCalls)
} else if (lastAgentReasoningMessage) {
finalResult = lastAgentReasoningMessage
if (socketIO && incomingInput.socketIOClientId) {
socketIO.to(incomingInput.socketIOClientId).emit('token', finalResult)
}
}
}
totalSourceDocuments = uniq(flatten(totalSourceDocuments))
totalUsedTools = uniq(flatten(totalUsedTools))
if (socketIO && incomingInput.socketIOClientId) {
socketIO.to(incomingInput.socketIOClientId).emit('usedTools', totalUsedTools)
socketIO.to(incomingInput.socketIOClientId).emit('sourceDocuments', totalSourceDocuments)
socketIO.to(incomingInput.socketIOClientId).emit('end')
}
return {
finalResult,
finalAction,
sourceDocuments: totalSourceDocuments,
usedTools: totalUsedTools,
agentReasoning
}
}
} catch (e) {
if (socketIO && incomingInput.socketIOClientId) {
socketIO.to(incomingInput.socketIOClientId).emit('abort')
// clear agent memory because checkpoints were saved during runtime
await clearSessionMemory(nodes, appServer.nodesPool.componentNodes, chatId, appServer.AppDataSource, sessionId)
if (getErrorMessage(e).includes('Aborted')) {
if (socketIO && incomingInput.socketIOClientId) {
socketIO.to(incomingInput.socketIOClientId).emit('abort')
}
return { finalResult, agentReasoning }
}
return { finalResult, agentReasoning }
throw new Error(getErrorMessage(e))
}
return streamResults
} catch (e) {
@@ -198,9 +414,9 @@ export const buildAgentGraph = async (
}
/**
* Compile Graph
* Compile Multi Agents Graph
* @param {IChatFlow} chatflow
* @param {Record<string, string>} mapNameToLabel
* @param {Record<string, {label: string, nodeName: string }>} mapNameToLabel
* @param {IReactFlowNode[]} reactflowNodes
* @param {string[]} workerNodeIds
* @param {IComponentNodes} componentNodes
@@ -208,17 +424,20 @@ export const buildAgentGraph = async (
* @param {string[]} startingNodeIds
* @param {string} question
* @param {ICommonObject} overrideConfig
* @param {string} threadId
*/
const compileGraph = async (
const compileMultiAgentsGraph = async (
chatflow: IChatFlow,
mapNameToLabel: Record<string, string>,
mapNameToLabel: Record<string, { label: string; nodeName: string }>,
reactflowNodes: IReactFlowNode[] = [],
workerNodeIds: string[],
componentNodes: IComponentNodes,
options: ICommonObject,
startingNodeIds: string[],
question: string,
overrideConfig?: ICommonObject
chatHistory: IMessage[] = [],
overrideConfig?: ICommonObject,
threadId?: string
) => {
const appServer = getRunningExpressApp()
const channels: ITeamState = {
@@ -228,7 +447,8 @@ const compileGraph = async (
},
next: 'initialState',
instructions: "Solve the user's request.",
team_members: []
team_members: [],
summarization: 'summarize'
}
const workflowGraph = new StateGraph<ITeamState>({
@@ -248,7 +468,7 @@ const compileGraph = async (
let flowNodeData = cloneDeep(workerNode.data)
if (overrideConfig) flowNodeData = replaceInputsWithConfig(flowNodeData, overrideConfig)
flowNodeData = resolveVariables(flowNodeData, reactflowNodes, question, [])
flowNodeData = resolveVariables(flowNodeData, reactflowNodes, question, chatHistory)
try {
const workerResult: IMultiAgentNode = await newNodeInstance.init(flowNodeData, question, options)
@@ -268,7 +488,7 @@ const compileGraph = async (
// Init supervisor nodes
for (const supervisor in supervisorWorkers) {
const supervisorInputLabel = mapNameToLabel[supervisor]
const supervisorInputLabel = mapNameToLabel[supervisor].label
const supervisorNode = reactflowNodes.find((node) => supervisorInputLabel === node.data.inputs?.supervisorName)
if (!supervisorNode) continue
@@ -279,7 +499,7 @@ const compileGraph = async (
let flowNodeData = cloneDeep(supervisorNode.data)
if (overrideConfig) flowNodeData = replaceInputsWithConfig(flowNodeData, overrideConfig)
flowNodeData = resolveVariables(flowNodeData, reactflowNodes, question, [])
flowNodeData = resolveVariables(flowNodeData, reactflowNodes, question, chatHistory)
if (flowNodeData.inputs) flowNodeData.inputs.workerNodes = supervisorWorkers[supervisor]
@@ -300,6 +520,7 @@ const compileGraph = async (
workflowGraph.addNode(supervisorResult.name, supervisorResult.node)
for (const worker of supervisorResult.workers) {
//@ts-ignore
workflowGraph.addEdge(worker, supervisorResult.name)
}
@@ -308,12 +529,14 @@ const compileGraph = async (
conditionalEdges[supervisorResult.workers[i]] = supervisorResult.workers[i]
}
//@ts-ignore
workflowGraph.addConditionalEdges(supervisorResult.name, (x: ITeamState) => x.next, {
...conditionalEdges,
FINISH: END
})
workflowGraph.setEntryPoint(supervisorResult.name)
//@ts-ignore
workflowGraph.addEdge(START, supervisorResult.name)
// Add agentflow to pool
;(workflowGraph as any).signal = options.signal
@@ -324,22 +547,443 @@ const compileGraph = async (
overrideConfig
)
// TODO: add persistence
// const memory = new MemorySaver()
const graph = workflowGraph.compile()
// Get memory
let memory = supervisorResult?.checkpointMemory
const graph = workflowGraph.compile({ checkpointer: memory })
const loggerHandler = new ConsoleCallbackHandler(logger)
const callbacks = await additionalCallbacks(flowNodeData, options)
const config = { configurable: { thread_id: threadId } }
// Return stream result as we should only have 1 supervisor
return await graph.stream(
{
messages: [new HumanMessage({ content: question })]
},
{ recursionLimit: supervisorResult?.recursionLimit ?? 100, callbacks: [loggerHandler, ...callbacks] }
{ recursionLimit: supervisorResult?.recursionLimit ?? 100, callbacks: [loggerHandler, ...callbacks], configurable: config }
)
} catch (e) {
throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, `Error initialize supervisor nodes - ${getErrorMessage(e)}`)
}
}
}
/**
* Compile Seq Agents Graph
* @param {IDepthQueue} depthQueue
* @param {IChatFlow} chatflow
* @param {IReactFlowNode[]} reactflowNodes
* @param {IReactFlowEdge[]} reactflowEdges
* @param {IComponentNodes} componentNodes
* @param {ICommonObject} options
* @param {string} question
* @param {IMessage[]} chatHistory
* @param {ICommonObject} overrideConfig
* @param {string} threadId
* @param {IAction} action
*/
const compileSeqAgentsGraph = async (
depthQueue: IDepthQueue,
chatflow: IChatFlow,
reactflowNodes: IReactFlowNode[] = [],
reactflowEdges: IReactFlowEdge[] = [],
componentNodes: IComponentNodes,
options: ICommonObject,
question: string,
chatHistory: IMessage[] = [],
overrideConfig?: ICommonObject,
threadId?: string,
action?: IAction
) => {
const appServer = getRunningExpressApp()
let channels: ISeqAgentsState = {
messages: {
value: (x: BaseMessage[], y: BaseMessage[]) => x.concat(y),
default: () => []
}
}
// Get state
const seqStateNode = reactflowNodes.find((node: IReactFlowNode) => node.data.name === 'seqState')
if (seqStateNode) {
channels = {
...seqStateNode.data.instance.node,
...channels
}
}
let seqGraph = new StateGraph<any>({
//@ts-ignore
channels
})
/*** Validate Graph ***/
const startAgentNodes: IReactFlowNode[] = reactflowNodes.filter((node: IReactFlowNode) => node.data.name === 'seqStart')
if (!startAgentNodes.length) throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, 'Start node not found')
if (startAgentNodes.length > 1)
throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, 'Graph should have only one start node')
const endAgentNodes: IReactFlowNode[] = reactflowNodes.filter((node: IReactFlowNode) => node.data.name === 'seqEnd')
const loopNodes: IReactFlowNode[] = reactflowNodes.filter((node: IReactFlowNode) => node.data.name === 'seqLoop')
if (!endAgentNodes.length && !loopNodes.length) {
throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, 'Graph should have at least one End/Loop node')
}
/*** End of Validation ***/
let flowNodeData
let conditionalEdges: Record<string, { nodes: Record<string, string>; func: any }> = {}
let interruptedRouteMapping: Record<string, Record<string, string>> = {}
let conditionalToolNodes: Record<string, { source: ISeqAgentNode; toolNodes: ISeqAgentNode[] }> = {}
let bindModel: Record<string, any> = {}
let interruptToolNodeNames = []
const initiateNode = async (node: IReactFlowNode) => {
const nodeInstanceFilePath = componentNodes[node.data.name].filePath as string
const nodeModule = await import(nodeInstanceFilePath)
const newNodeInstance = new nodeModule.nodeClass()
flowNodeData = cloneDeep(node.data)
if (overrideConfig) flowNodeData = replaceInputsWithConfig(flowNodeData, overrideConfig)
flowNodeData = resolveVariables(flowNodeData, reactflowNodes, question, chatHistory)
const seqAgentNode: ISeqAgentNode = await newNodeInstance.init(flowNodeData, question, options)
return seqAgentNode
}
/*
* Two objectives we want to achieve here:
* 1.) Prepare the mapping of conditional outputs to next nodes. This mapping will ONLY be used to add conditional edges to the Interrupted Agent connected next to Condition/ConditionAgent Node.
* For example, if the condition node has 2 outputs 'Yes' and 'No', and 'Yes' leads to 'agentName1' and 'No' leads to 'agentName2', then the mapping should be like:
* {
* <conditionNodeId>: { 'Yes': 'agentName1', 'No': 'agentName2' }
* }
* 2.) With the interruptedRouteMapping object, avoid adding conditional edges to the Interrupted Agent for the nodes that are already interrupted by tools. It will be separately added from the function - agentInterruptToolFunc
*/
const processInterruptedRouteMapping = (conditionNodeId: string) => {
const conditionEdges = reactflowEdges.filter((edge) => edge.source === conditionNodeId) ?? []
for (const conditionEdge of conditionEdges) {
const nextNodeId = conditionEdge.target
const conditionNodeOutputAnchorId = conditionEdge.sourceHandle
const nextNode = reactflowNodes.find((node) => node.id === nextNodeId)
if (!nextNode) continue
const conditionNode = reactflowNodes.find((node) => node.id === conditionNodeId)
if (!conditionNode) continue
const outputAnchors = conditionNode?.data.outputAnchors
if (!outputAnchors || !outputAnchors.length || !outputAnchors[0].options) continue
const conditionOutputAnchorLabel =
outputAnchors[0].options.find((option: any) => option.id === conditionNodeOutputAnchorId)?.label ?? ''
if (!conditionOutputAnchorLabel) continue
if (Object.prototype.hasOwnProperty.call(interruptedRouteMapping, conditionNodeId)) {
interruptedRouteMapping[conditionNodeId] = {
...interruptedRouteMapping[conditionNodeId],
[conditionOutputAnchorLabel]: nextNode.data.instance.name
}
} else {
interruptedRouteMapping[conditionNodeId] = {
[conditionOutputAnchorLabel]: nextNode.data.instance.name
}
}
}
}
/*
* Prepare Conditional Edges
* Example: {
* 'seqCondition_1': { nodes: { 'Yes': 'agentName1', 'No': 'agentName2' }, func: <condition-function>, disabled: true },
* 'seqCondition_2': { nodes: { 'Yes': 'agentName3', 'No': 'agentName4' }, func: <condition-function> }
* }
*/
const prepareConditionalEdges = (nodeId: string, nodeInstance: ISeqAgentNode) => {
const conditionEdges = reactflowEdges.filter((edge) => edge.target === nodeId && edge.source.includes('seqCondition')) ?? []
for (const conditionEdge of conditionEdges) {
const conditionNodeId = conditionEdge.source
const conditionNodeOutputAnchorId = conditionEdge.sourceHandle
const conditionNode = reactflowNodes.find((node) => node.id === conditionNodeId)
const outputAnchors = conditionNode?.data.outputAnchors
if (!outputAnchors || !outputAnchors.length || !outputAnchors[0].options) continue
const conditionOutputAnchorLabel =
outputAnchors[0].options.find((option: any) => option.id === conditionNodeOutputAnchorId)?.label ?? ''
if (!conditionOutputAnchorLabel) continue
if (Object.prototype.hasOwnProperty.call(conditionalEdges, conditionNodeId)) {
conditionalEdges[conditionNodeId] = {
...conditionalEdges[conditionNodeId],
nodes: { ...conditionalEdges[conditionNodeId].nodes, [conditionOutputAnchorLabel]: nodeInstance.name }
}
} else {
conditionalEdges[conditionNodeId] = {
nodes: { [conditionOutputAnchorLabel]: nodeInstance.name },
func: conditionNode.data.instance.node
}
}
}
}
/*
* Prepare Conditional Tool Edges. This is just for LLMNode -> ToolNode
* Example: {
* 'agent_1': { source: agent, toolNodes: [node] }
* }
*/
const prepareLLMToToolEdges = (predecessorAgent: ISeqAgentNode, toolNodeInstance: ISeqAgentNode) => {
if (Object.prototype.hasOwnProperty.call(conditionalToolNodes, predecessorAgent.id)) {
const toolNodes = conditionalToolNodes[predecessorAgent.id].toolNodes
toolNodes.push(toolNodeInstance)
conditionalToolNodes[predecessorAgent.id] = { source: predecessorAgent, toolNodes }
} else {
conditionalToolNodes[predecessorAgent.id] = {
source: predecessorAgent,
toolNodes: [toolNodeInstance]
}
}
}
/*** This is to bind the tools to the model of LLMNode, when the LLMNode is predecessor/successor of ToolNode ***/
const createBindModel = (agent: ISeqAgentNode, toolNodeInstance: ISeqAgentNode) => {
const tools = flatten(toolNodeInstance.node?.tools)
bindModel[agent.id] = agent.llm.bindTools(tools)
}
/*** Start processing every Agent nodes ***/
for (const agentNodeId of getSortedDepthNodes(depthQueue)) {
const agentNode = reactflowNodes.find((node) => node.id === agentNodeId)
if (!agentNode) continue
const eligibleSeqNodes = ['seqAgent', 'seqEnd', 'seqLoop', 'seqToolNode', 'seqLLMNode']
const nodesToAdd = ['seqAgent', 'seqToolNode', 'seqLLMNode']
if (eligibleSeqNodes.includes(agentNode.data.name)) {
try {
const agentInstance: ISeqAgentNode = await initiateNode(agentNode)
if (nodesToAdd.includes(agentNode.data.name)) {
// Add node to graph
seqGraph.addNode(agentInstance.name, agentInstance.node)
/*
* If it is an Interrupted Agent, we want to:
* 1.) Add conditional edges to the Interrupted Agent via agentInterruptToolFunc
* 2.) Add agent to the interruptToolNodeNames list
*/
if (agentInstance.type === 'agent' && agentNode.data.inputs?.interrupt) {
interruptToolNodeNames.push(agentInstance.agentInterruptToolNode.name)
const nextNodeId = reactflowEdges.find((edge) => edge.source === agentNode.id)?.target
const nextNode = reactflowNodes.find((node) => node.id === nextNodeId)
let nextNodeSeqAgentName = ''
if (nextNodeId && nextNode) {
nextNodeSeqAgentName = nextNode.data.instance.name
// If next node is Condition Node, process the interrupted route mapping, see more details from comments of processInterruptedRouteMapping
if (nextNode.data.name.includes('seqCondition')) {
const conditionNode = nextNodeId
processInterruptedRouteMapping(conditionNode)
seqGraph = await agentInstance.agentInterruptToolFunc(
seqGraph,
undefined,
nextNode.data.instance.node,
interruptedRouteMapping[conditionNode]
)
} else {
seqGraph = await agentInstance.agentInterruptToolFunc(seqGraph, nextNodeSeqAgentName)
}
} else {
seqGraph = await agentInstance.agentInterruptToolFunc(seqGraph, nextNodeSeqAgentName)
}
}
}
if (agentInstance.predecessorAgents) {
const predecessorAgents: ISeqAgentNode[] = agentInstance.predecessorAgents
const edges = []
for (const predecessorAgent of predecessorAgents) {
// Add start edge and set entry point
if (predecessorAgent.name === START) {
if (agentInstance.moderations && agentInstance.moderations.length > 0) {
try {
for (const moderation of agentInstance.moderations) {
question = await moderation.checkForViolations(question)
}
} catch (e) {
throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, getErrorMessage(e))
}
}
//@ts-ignore
seqGraph.addEdge(START, agentInstance.name)
} else if (predecessorAgent.type === 'condition') {
/*
* If current node is Condition Node, AND predecessor is an Interrupted Agent
* Don't add conditional edges to the Interrupted Agent, as it will be added separately from the function - agentInterruptToolFunc
*/
if (!Object.prototype.hasOwnProperty.call(interruptedRouteMapping, predecessorAgent.id)) {
prepareConditionalEdges(agentNode.data.id, agentInstance)
}
} else if (agentNode.data.name === 'seqToolNode') {
// Prepare the conditional edges for LLMNode -> ToolNode AND bind the tools to LLMNode
prepareLLMToToolEdges(predecessorAgent, agentInstance)
createBindModel(predecessorAgent, agentInstance)
// If current ToolNode has interrupt turned on, add the ToolNode name to interruptToolNodeNames
if (agentInstance.node.interrupt) {
interruptToolNodeNames.push(agentInstance.name)
}
} else if (predecessorAgent.name) {
// In the scenario when ToolNode -> LLMNode, bind the tools to LLMNode
if (agentInstance.type === 'llm' && predecessorAgent.type === 'tool') {
createBindModel(agentInstance, predecessorAgent)
}
// Add edge to graph ONLY when predecessor is not an Interrupted Agent
if (!predecessorAgent.agentInterruptToolNode) {
edges.push(predecessorAgent.name)
}
}
}
// Edges can be multiple, in the case of parallel node executions
if (edges.length > 1) {
//@ts-ignore
seqGraph.addEdge(edges, agentInstance.name)
} else if (edges.length === 1) {
//@ts-ignore
seqGraph.addEdge(...edges, agentInstance.name)
}
}
} catch (e) {
throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, `Error initialize agent nodes - ${getErrorMessage(e)}`)
}
}
}
/*** Add conditional edges to graph for condition nodes ***/
for (const conditionNodeId in conditionalEdges) {
const startConditionEdges = reactflowEdges.filter((edge) => edge.target === conditionNodeId)
if (!startConditionEdges.length) continue
for (const startConditionEdge of startConditionEdges) {
const startConditionNode = reactflowNodes.find((node) => node.id === startConditionEdge.source)
if (!startConditionNode) continue
seqGraph.addConditionalEdges(
startConditionNode.data.instance.name,
conditionalEdges[conditionNodeId].func,
//@ts-ignore
conditionalEdges[conditionNodeId].nodes
)
}
}
/*** Add conditional edges to graph for LLMNode -> ToolNode ***/
for (const llmSourceNodeId in conditionalToolNodes) {
const connectedToolNodes = conditionalToolNodes[llmSourceNodeId].toolNodes
const sourceNode = conditionalToolNodes[llmSourceNodeId].source
const routeMessage = (state: ISeqAgentsState) => {
const messages = state.messages as unknown as BaseMessage[]
const lastMessage = messages[messages.length - 1] as AIMessage
if (!lastMessage.tool_calls?.length) {
return END
}
for (const toolCall of lastMessage.tool_calls) {
for (const toolNode of connectedToolNodes) {
const tools = (toolNode.node?.tools as StructuredTool[]) || ((toolNode as any).tools as StructuredTool[])
if (tools.some((tool) => tool.name === toolCall.name)) {
return toolNode.name
}
}
}
return END
}
seqGraph.addConditionalEdges(
//@ts-ignore
sourceNode.name,
routeMessage
)
}
/*** Add agentflow to pool ***/
;(seqGraph as any).signal = options.signal
appServer.chatflowPool.add(
`${chatflow.id}_${options.chatId}`,
seqGraph as any,
reactflowNodes.filter((node) => startAgentNodes.map((nd) => nd.id).includes(node.id)),
overrideConfig
)
/*** Get memory ***/
const startNode = reactflowNodes.find((node: IReactFlowNode) => node.data.name === 'seqStart')
let memory = startNode?.data.instance?.checkpointMemory
try {
const graph = seqGraph.compile({ checkpointer: memory, interruptBefore: interruptToolNodeNames as any })
const loggerHandler = new ConsoleCallbackHandler(logger)
const callbacks = await additionalCallbacks(flowNodeData as any, options)
const config = { configurable: { thread_id: threadId }, bindModel }
let humanMsg: { messages: HumanMessage[] | ToolMessage[] } | null = {
messages: [new HumanMessage({ content: question })]
}
if (action && action.mapping && question === action.mapping.approve) {
humanMsg = null
} else if (action && action.mapping && question === action.mapping.reject) {
humanMsg = {
messages: action.mapping.toolCalls.map((toolCall) => {
return new ToolMessage({
name: toolCall.name,
content: `Tool ${toolCall.name} call denied by user. Acknowledge that, and DONT perform further actions. Only ask if user have other questions`,
tool_call_id: toolCall.id!,
additional_kwargs: { toolCallsDenied: true }
})
})
}
}
return await graph.stream(humanMsg, { callbacks: [loggerHandler, ...callbacks], configurable: config })
} catch (e) {
logger.error('Error compile graph', e)
throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, `Error compile graph - ${getErrorMessage(e)}`)
}
}
const getSortedDepthNodes = (depthQueue: IDepthQueue) => {
// Step 1: Convert the object into an array of [key, value] pairs and sort them by the value
const sortedEntries = Object.entries(depthQueue).sort((a, b) => a[1] - b[1])
// Step 2: Group keys by their depth values
const groupedByDepth: Record<number, string[]> = {}
sortedEntries.forEach(([key, value]) => {
if (!groupedByDepth[value]) {
groupedByDepth[value] = []
}
groupedByDepth[value].push(key)
})
// Step 3: Create the final sorted array with grouped keys
const sortedArray: (string | string[])[] = []
Object.keys(groupedByDepth)
.sort((a, b) => parseInt(a) - parseInt(b))
.forEach((depth) => {
const items = groupedByDepth[parseInt(depth)]
sortedArray.push(...items)
})
return sortedArray.flat()
}
+56 -30
View File
@@ -43,6 +43,8 @@ import logger from './logger'
import { utilAddChatMessage } from './addChatMesage'
import { buildAgentGraph } from './buildAgentGraph'
import { getErrorMessage } from '../errors/utils'
import { ChatMessage } from '../database/entities/ChatMessage'
import { IAction } from 'flowise-components'
/**
* Build Chatflow
@@ -174,7 +176,7 @@ export const utilBuildChatflow = async (req: Request, socketIO?: Server, isInter
const endingNodes = getEndingNodes(nodeDependencies, directedGraph, nodes)
/*** If the graph is an agent graph, build the agent response ***/
if (endingNodes.filter((node) => node.data.category === 'Multi Agents').length) {
if (endingNodes.filter((node) => node.data.category === 'Multi Agents' || node.data.category === 'Sequential Agents').length) {
return await utilBuildAgentResponse(
chatflow,
isInternal,
@@ -292,28 +294,27 @@ export const utilBuildChatflow = async (req: Request, socketIO?: Server, isInter
logger.debug(`[server]: Start building chatflow ${chatflowid}`)
/*** BFS to traverse from Starting Nodes to Ending Node ***/
const reactFlowNodes = await buildFlow(
const reactFlowNodes = await buildFlow({
startingNodeIds,
nodes,
edges,
reactFlowNodes: nodes,
reactFlowEdges: edges,
graph,
depthQueue,
appServer.nodesPool.componentNodes,
incomingInput.question,
componentNodes: appServer.nodesPool.componentNodes,
question: incomingInput.question,
chatHistory,
chatId,
sessionId ?? '',
sessionId: sessionId ?? '',
chatflowid,
appServer.AppDataSource,
incomingInput?.overrideConfig,
appServer.cachePool,
false,
undefined,
incomingInput.uploads,
appDataSource: appServer.AppDataSource,
overrideConfig: incomingInput?.overrideConfig,
cachePool: appServer.cachePool,
isUpsert: false,
uploads: incomingInput.uploads,
baseURL,
socketIO,
incomingInput.socketIOClientId
)
socketIOClientId: incomingInput.socketIOClientId
})
const nodeToExecute =
endingNodeIds.length === 1
@@ -432,14 +433,14 @@ export const utilBuildChatflow = async (req: Request, socketIO?: Server, isInter
}
const utilBuildAgentResponse = async (
chatflow: IChatFlow,
agentflow: IChatFlow,
isInternal: boolean,
chatId: string,
memoryType: string,
sessionId: string,
userMessageDateTime: Date,
fileUploads: IFileUpload[],
incomingInput: ICommonObject,
incomingInput: IncomingInput,
nodes: IReactFlowNode[],
edges: IReactFlowEdge[],
socketIO?: Server,
@@ -447,13 +448,13 @@ const utilBuildAgentResponse = async (
) => {
try {
const appServer = getRunningExpressApp()
const streamResults = await buildAgentGraph(chatflow, chatId, sessionId, incomingInput, baseURL, socketIO)
const streamResults = await buildAgentGraph(agentflow, chatId, sessionId, incomingInput, isInternal, baseURL, socketIO)
if (streamResults) {
const { finalResult, agentReasoning } = streamResults
const { finalResult, finalAction, sourceDocuments, usedTools, agentReasoning } = streamResults
const userMessage: Omit<IChatMessage, 'id'> = {
role: 'userMessage',
content: incomingInput.question,
chatflowid: chatflow.id,
chatflowid: agentflow.id,
chatType: isInternal ? chatType.INTERNAL : chatType.EXTERNAL,
chatId,
memoryType,
@@ -467,23 +468,54 @@ const utilBuildAgentResponse = async (
const apiMessage: Omit<IChatMessage, 'id' | 'createdDate'> = {
role: 'apiMessage',
content: finalResult,
chatflowid: chatflow.id,
chatflowid: agentflow.id,
chatType: isInternal ? chatType.INTERNAL : chatType.EXTERNAL,
chatId,
memoryType,
sessionId
}
if (sourceDocuments.length) apiMessage.sourceDocuments = JSON.stringify(sourceDocuments)
if (usedTools.length) apiMessage.usedTools = JSON.stringify(usedTools)
if (agentReasoning.length) apiMessage.agentReasoning = JSON.stringify(agentReasoning)
if (Object.keys(finalAction).length) apiMessage.action = JSON.stringify(finalAction)
const chatMessage = await utilAddChatMessage(apiMessage)
await appServer.telemetry.sendTelemetry('prediction_sent', {
await appServer.telemetry.sendTelemetry('agentflow_prediction_sent', {
version: await getAppVersion(),
chatlowId: chatflow.id,
agentflowId: agentflow.id,
chatId,
type: isInternal ? chatType.INTERNAL : chatType.EXTERNAL,
flowGraph: getTelemetryFlowObj(nodes, edges)
})
// Find the previous chat message with the same action id and remove the action
if (incomingInput.action && Object.keys(incomingInput.action).length) {
let query = await appServer.AppDataSource.getRepository(ChatMessage)
.createQueryBuilder('chat_message')
.where('chat_message.chatId = :chatId', { chatId })
.orWhere('chat_message.sessionId = :sessionId', { sessionId })
.orderBy('chat_message.createdDate', 'DESC')
.getMany()
for (const result of query) {
if (result.action) {
try {
const action: IAction = JSON.parse(result.action)
if (action.id === incomingInput.action.id) {
const newChatMessage = new ChatMessage()
Object.assign(newChatMessage, result)
newChatMessage.action = null
const cm = await appServer.AppDataSource.getRepository(ChatMessage).create(newChatMessage)
await appServer.AppDataSource.getRepository(ChatMessage).save(cm)
break
}
} catch (e) {
// error converting action to JSON
}
}
}
}
// Prepare response
let result: ICommonObject = {}
result.text = finalResult
@@ -493,13 +525,7 @@ const utilBuildAgentResponse = async (
if (sessionId) result.sessionId = sessionId
if (memoryType) result.memoryType = memoryType
if (agentReasoning.length) result.agentReasoning = agentReasoning
await appServer.telemetry.sendTelemetry('graph_compiled', {
version: await getAppVersion(),
graphId: chatflow.id,
type: isInternal ? chatType.INTERNAL : chatType.EXTERNAL,
flowGraph: getTelemetryFlowObj(nodes, edges)
})
if (Object.keys(finalAction).length) result.action = finalAction
return result
}
@@ -18,7 +18,15 @@ export const utilGetUploadsConfig = async (chatflowid: string): Promise<any> =>
throw new InternalFlowiseError(StatusCodes.NOT_FOUND, `Chatflow ${chatflowid} not found`)
}
const uploadAllowedNodes = ['llmChain', 'conversationChain', 'reactAgentChat', 'conversationalAgent', 'toolAgent', 'supervisor']
const uploadAllowedNodes = [
'llmChain',
'conversationChain',
'reactAgentChat',
'conversationalAgent',
'toolAgent',
'supervisor',
'seqStart'
]
const uploadProcessingNodes = ['chatOpenAI', 'chatAnthropic', 'awsChatBedrock', 'azureChatOpenAI', 'chatGoogleGenerativeAI']
const flowObj = JSON.parse(chatflow.flowData)
+57 -37
View File
@@ -269,7 +269,8 @@ export const getEndingNodes = (
endingNodeData.category !== 'Chains' &&
endingNodeData.category !== 'Agents' &&
endingNodeData.category !== 'Engine' &&
endingNodeData.category !== 'Multi Agents'
endingNodeData.category !== 'Multi Agents' &&
endingNodeData.category !== 'Sequential Agents'
) {
error = new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, `Ending node must be either a Chain or Agent or Engine`)
continue
@@ -416,42 +417,55 @@ const checkIfDocLoaderShouldBeIgnored = (
return false
}
/**
* Build langchain from start to end
* @param {string[]} startingNodeIds
* @param {IReactFlowNode[]} reactFlowNodes
* @param {INodeDirectedGraph} graph
* @param {IDepthQueue} depthQueue
* @param {IComponentNodes} componentNodes
* @param {string} question
* @param {string} chatId
* @param {string} chatflowid
* @param {DataSource} appDataSource
* @param {ICommonObject} overrideConfig
* @param {CachePool} cachePool
*/
export const buildFlow = async (
startingNodeIds: string[],
reactFlowNodes: IReactFlowNode[],
reactFlowEdges: IReactFlowEdge[],
graph: INodeDirectedGraph,
depthQueue: IDepthQueue,
componentNodes: IComponentNodes,
question: string,
chatHistory: IMessage[],
chatId: string,
sessionId: string,
chatflowid: string,
appDataSource: DataSource,
overrideConfig?: ICommonObject,
cachePool?: CachePool,
isUpsert?: boolean,
stopNodeId?: string,
uploads?: IFileUpload[],
baseURL?: string,
socketIO?: Server,
type BuildFlowParams = {
startingNodeIds: string[]
reactFlowNodes: IReactFlowNode[]
reactFlowEdges: IReactFlowEdge[]
graph: INodeDirectedGraph
depthQueue: IDepthQueue
componentNodes: IComponentNodes
question: string
chatHistory: IMessage[]
chatId: string
sessionId: string
chatflowid: string
appDataSource: DataSource
overrideConfig?: ICommonObject
cachePool?: CachePool
isUpsert?: boolean
stopNodeId?: string
uploads?: IFileUpload[]
baseURL?: string
socketIO?: Server
socketIOClientId?: string
) => {
}
/**
* Build flow from start to end
* @param {BuildFlowParams} params
*/
export const buildFlow = async ({
startingNodeIds,
reactFlowNodes,
reactFlowEdges,
graph,
depthQueue,
componentNodes,
question,
chatHistory,
chatId,
sessionId,
chatflowid,
appDataSource,
overrideConfig,
cachePool,
isUpsert,
stopNodeId,
uploads,
baseURL,
socketIO,
socketIOClientId
}: BuildFlowParams) => {
const flowNodes = cloneDeep(reactFlowNodes)
let upsertHistory: Record<string, any> = {}
@@ -779,9 +793,15 @@ export const getVariableValue = (
const variablePaths = Object.keys(variableDict)
variablePaths.sort() // Sort by length of variable path because longer path could possibly contains nested variable
variablePaths.forEach((path) => {
const variableValue = variableDict[path]
let variableValue: object | string = variableDict[path]
// Replace all occurrence
if (typeof variableValue === 'object') {
// Just get the id of variableValue object if it is agentflow node, to avoid circular JSON error
if (Object.prototype.hasOwnProperty.call(variableValue, 'predecessorAgents')) {
const nodeId = variableValue['id']
variableValue = { id: nodeId }
}
const stringifiedValue = JSON.stringify(JSON.stringify(variableValue))
if (stringifiedValue.startsWith('"') && stringifiedValue.endsWith('"')) {
// get rid of the double quotes
+11 -11
View File
@@ -117,24 +117,24 @@ export const upsertVector = async (req: Request, isInternal: boolean = false) =>
const { startingNodeIds, depthQueue } = getStartingNodes(filteredGraph, stopNodeId)
const upsertedResult = await buildFlow(
const upsertedResult = await buildFlow({
startingNodeIds,
nodes,
edges,
filteredGraph,
reactFlowNodes: nodes,
reactFlowEdges: edges,
graph: filteredGraph,
depthQueue,
appServer.nodesPool.componentNodes,
incomingInput.question,
componentNodes: appServer.nodesPool.componentNodes,
question: incomingInput.question,
chatHistory,
chatId,
sessionId ?? '',
sessionId: sessionId ?? '',
chatflowid,
appServer.AppDataSource,
incomingInput?.overrideConfig,
appServer.cachePool,
appDataSource: appServer.AppDataSource,
overrideConfig: incomingInput?.overrideConfig,
cachePool: appServer.cachePool,
isUpsert,
stopNodeId
)
})
const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.data.id))