Feature/lang graph (#2319)

* add langgraph

* datasource: initial commit

* datasource: datasource details and chunks

* datasource: Document Store Node

* more changes

* Document Store - Base functionality

* Document Store Loader Component

* Document Store Loader Component

* before merging the modularity PR

* after merging the modularity PR

* preview mode

* initial draft PR

* fixes

* minor updates and  fixes

* preview with loader and splitter

* preview with credential

* show stored chunks

* preview update...

* edit config

* save, preview and other changes

* save, preview and other changes

* save, process and other changes

* save, process and other changes

* alpha1 - for internal testing

* rerouting urls

* bug fix on new leader create

* pagination support for chunks

* delete document store

* Update pnpm-lock.yaml

* doc store card view

* Update store files to use updated storage functions, Document Store Table View and other changes

* ui changes

* add expanded chunk dialog, improve ui

* change throw Error to InternalError

* Bug Fixes and removal of subFolder, adding of view chunks for store

* lint fixes

* merge changes

* DocumentStoreStatus component

* ui changes for doc store

* add remove metadata key field, add custom document loader

* add chatflows used doc store chips

* add types/interfaces to DocumentStore Services

* document loader list dialog title bar color change

* update interfaces

* Whereused Chatflow Name and Added chunkNo to retain order of created chunks.

* use typeorm order chunkNo, ui changes

* update tabler icons react

* cleanup agents

* add pysandbox tool

* add abort functionality, loading next agent

* add empty view svg

* update chatflow tool with chatId

* rename to agentflows

* update worker for prompt input values

* update dashboard to agentflows, agentcanvas

* fix marketplace use template

* add agentflow templates

* resolve merge conflict

* update baseURL

---------

Co-authored-by: vinodkiran <vinodkiran@usa.net>
Co-authored-by: Vinod Paidimarry <vinodkiran@outlook.in>
This commit is contained in:
Henry Heng
2024-05-21 16:36:42 +01:00
committed by GitHub
parent 95f1090bed
commit 8ebc4dcfd5
92 changed files with 7216 additions and 701 deletions
+6 -1
View File
@@ -2,6 +2,8 @@ import { ICommonObject, IFileUpload, INode, INodeData as INodeDataFromComponent,
export type MessageType = 'apiMessage' | 'userMessage'
export type ChatflowType = 'CHATFLOW' | 'MULTIAGENT'
export enum chatType {
INTERNAL = 'INTERNAL',
EXTERNAL = 'EXTERNAL'
@@ -25,7 +27,9 @@ export interface IChatFlow {
apikeyid?: string
analytic?: string
chatbotConfig?: string
apiConfig?: any
apiConfig?: string
category?: string
type?: ChatflowType
}
export interface IChatMessage {
@@ -36,6 +40,7 @@ export interface IChatMessage {
sourceDocuments?: string
usedTools?: string
fileAnnotations?: string
agentReasoning?: string
fileUploads?: string
chatType: string
chatId: string
@@ -150,9 +150,25 @@ const removeAllChatMessages = async (req: Request, res: Response, next: NextFunc
}
}
const abortChatMessage = async (req: Request, res: Response, next: NextFunction) => {
try {
if (typeof req.params === 'undefined' || !req.params.chatflowid || !req.params.chatid) {
throw new InternalFlowiseError(
StatusCodes.PRECONDITION_FAILED,
`Error: chatMessagesController.abortChatMessage - chatflowid or chatid not provided!`
)
}
await chatMessagesService.abortChatMessage(req.params.chatid, req.params.chatflowid)
return res.json({ status: 200, message: 'Chat message aborted' })
} catch (error) {
next(error)
}
}
export default {
createChatMessage,
getAllChatMessages,
getAllInternalChatMessages,
removeAllChatMessages
removeAllChatMessages,
abortChatMessage
}
@@ -5,6 +5,7 @@ import { createRateLimiter } from '../../utils/rateLimit'
import { getApiKey } from '../../utils/apiKey'
import { InternalFlowiseError } from '../../errors/internalFlowiseError'
import { StatusCodes } from 'http-status-codes'
import { ChatflowType } from '../../Interface'
const checkIfChatflowIsValidForStreaming = async (req: Request, res: Response, next: NextFunction) => {
try {
@@ -50,7 +51,7 @@ const deleteChatflow = async (req: Request, res: Response, next: NextFunction) =
const getAllChatflows = async (req: Request, res: Response, next: NextFunction) => {
try {
const apiResponse = await chatflowsService.getAllChatflows()
const apiResponse = await chatflowsService.getAllChatflows(req.query?.type as ChatflowType)
return res.json(apiResponse)
} catch (error) {
next(error)
@@ -60,17 +61,17 @@ const getAllChatflows = async (req: Request, res: Response, next: NextFunction)
// Get specific chatflow via api key
const getChatflowByApiKey = async (req: Request, res: Response, next: NextFunction) => {
try {
if (typeof req.params === 'undefined' || !req.params.apiKey) {
if (typeof req.params === 'undefined' || !req.params.apikey) {
throw new InternalFlowiseError(
StatusCodes.PRECONDITION_FAILED,
`Error: chatflowsRouter.getChatflowByApiKey - apiKey not provided!`
`Error: chatflowsRouter.getChatflowByApiKey - apikey not provided!`
)
}
const apiKey = await getApiKey(req.params.apiKey)
if (!apiKey) {
const apikey = await getApiKey(req.params.apikey)
if (!apikey) {
return res.status(401).send('Unauthorized')
}
const apiResponse = await chatflowsService.getChatflowByApiKey(apiKey.id)
const apiResponse = await chatflowsService.getChatflowByApiKey(apikey.id)
return res.json(apiResponse)
} catch (error) {
next(error)
@@ -1,6 +1,6 @@
/* eslint-disable */
import { Entity, Column, CreateDateColumn, UpdateDateColumn, PrimaryGeneratedColumn } from 'typeorm'
import { IChatFlow } from '../../Interface'
import { ChatflowType, IChatFlow } from '../../Interface'
@Entity()
export class ChatFlow implements IChatFlow {
@@ -34,6 +34,12 @@ export class ChatFlow implements IChatFlow {
@Column({ nullable: true, type: 'text' })
speechToText?: string
@Column({ nullable: true, type: 'text' })
category?: string
@Column({ nullable: true, type: 'text' })
type?: ChatflowType
@Column({ type: 'timestamp' })
@CreateDateColumn()
createdDate: Date
@@ -41,7 +47,4 @@ export class ChatFlow implements IChatFlow {
@Column({ type: 'timestamp' })
@UpdateDateColumn()
updatedDate: Date
@Column({ nullable: true, type: 'text' })
category?: string
}
@@ -26,6 +26,9 @@ export class ChatMessage implements IChatMessage {
@Column({ nullable: true, type: 'text' })
fileAnnotations?: string
@Column({ nullable: true, type: 'text' })
agentReasoning?: string
@Column({ nullable: true, type: 'text' })
fileUploads?: string
@@ -0,0 +1,12 @@
import { MigrationInterface, QueryRunner } from 'typeorm'
export class AddAgentReasoningToChatMessage1714679514451 implements MigrationInterface {
public async up(queryRunner: QueryRunner): Promise<void> {
const columnExists = await queryRunner.hasColumn('chat_message', 'agentReasoning')
if (!columnExists) queryRunner.query(`ALTER TABLE \`chat_message\` ADD COLUMN \`agentReasoning\` LONGTEXT;`)
}
public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`ALTER TABLE \`chat_message\` DROP COLUMN \`agentReasoning\`;`)
}
}
@@ -0,0 +1,12 @@
import { MigrationInterface, QueryRunner } from 'typeorm'
export class AddTypeToChatFlow1766759476232 implements MigrationInterface {
public async up(queryRunner: QueryRunner): Promise<void> {
const columnExists = await queryRunner.hasColumn('chat_flow', 'type')
if (!columnExists) queryRunner.query(`ALTER TABLE \`chat_flow\` ADD COLUMN \`type\` TEXT;`)
}
public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`ALTER TABLE \`chat_flow\` DROP COLUMN \`type\`;`)
}
}
@@ -18,6 +18,8 @@ import { AddFeedback1707213626553 } from './1707213626553-AddFeedback'
import { AddDocumentStore1711637331047 } from './1711637331047-AddDocumentStore'
import { AddLead1710832127079 } from './1710832127079-AddLead'
import { AddLeadToChatMessage1711538023578 } from './1711538023578-AddLeadToChatMessage'
import { AddAgentReasoningToChatMessage1714679514451 } from './1714679514451-AddAgentReasoningToChatMessage'
import { AddTypeToChatFlow1766759476232 } from './1766759476232-AddTypeToChatFlow'
export const mysqlMigrations = [
Init1693840429259,
@@ -32,12 +34,14 @@ export const mysqlMigrations = [
AddUsedToolsToChatMessage1699481607341,
AddCategoryToChatFlow1699900910291,
AddFileAnnotationsToChatMessage1700271021237,
AddFileUploadsToChatMessage1701788586491,
AddVariableEntity1699325775451,
AddFileUploadsToChatMessage1701788586491,
AddSpeechToText1706364937060,
AddUpsertHistoryEntity1709814301358,
AddFeedback1707213626553,
AddDocumentStore1711637331047,
AddLead1710832127079,
AddLeadToChatMessage1711538023578
AddLeadToChatMessage1711538023578,
AddAgentReasoningToChatMessage1714679514451,
AddTypeToChatFlow1766759476232
]
@@ -0,0 +1,11 @@
import { MigrationInterface, QueryRunner } from 'typeorm'
export class AddAgentReasoningToChatMessage1714679514451 implements MigrationInterface {
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`ALTER TABLE "chat_message" ADD COLUMN IF NOT EXISTS "agentReasoning" TEXT;`)
}
public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`ALTER TABLE "chat_message" DROP COLUMN "agentReasoning";`)
}
}
@@ -0,0 +1,11 @@
import { MigrationInterface, QueryRunner } from 'typeorm'
export class AddTypeToChatFlow1766759476232 implements MigrationInterface {
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`ALTER TABLE "chat_flow" ADD COLUMN IF NOT EXISTS "type" TEXT;`)
}
public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`ALTER TABLE "chat_flow" DROP COLUMN "type";`)
}
}
@@ -19,6 +19,8 @@ import { FieldTypes1710497452584 } from './1710497452584-FieldTypes'
import { AddDocumentStore1711637331047 } from './1711637331047-AddDocumentStore'
import { AddLead1710832137905 } from './1710832137905-AddLead'
import { AddLeadToChatMessage1711538016098 } from './1711538016098-AddLeadToChatMessage'
import { AddAgentReasoningToChatMessage1714679514451 } from './1714679514451-AddAgentReasoningToChatMessage'
import { AddTypeToChatFlow1766759476232 } from './1766759476232-AddTypeToChatFlow'
export const postgresMigrations = [
Init1693891895163,
@@ -33,13 +35,15 @@ export const postgresMigrations = [
AddUsedToolsToChatMessage1699481607341,
AddCategoryToChatFlow1699900910291,
AddFileAnnotationsToChatMessage1700271021237,
AddFileUploadsToChatMessage1701788586491,
AddVariableEntity1699325775451,
AddFileUploadsToChatMessage1701788586491,
AddSpeechToText1706364937060,
AddUpsertHistoryEntity1709814301358,
AddFeedback1707213601923,
FieldTypes1710497452584,
AddDocumentStore1711637331047,
AddLead1710832137905,
AddLeadToChatMessage1711538016098
AddLeadToChatMessage1711538016098,
AddAgentReasoningToChatMessage1714679514451,
AddTypeToChatFlow1766759476232
]
@@ -0,0 +1,11 @@
import { MigrationInterface, QueryRunner } from 'typeorm'
export class AddAgentReasoningToChatMessage1714679514451 implements MigrationInterface {
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`ALTER TABLE "chat_message" ADD COLUMN "agentReasoning" TEXT;`)
}
public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`ALTER TABLE "chat_message" DROP COLUMN "agentReasoning";`)
}
}
@@ -0,0 +1,11 @@
import { MigrationInterface, QueryRunner } from 'typeorm'
export class AddTypeToChatFlow1766759476232 implements MigrationInterface {
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`ALTER TABLE "chat_flow" ADD COLUMN "type" TEXT;`)
}
public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`ALTER TABLE "chat_flow" DROP COLUMN "type";`)
}
}
@@ -18,6 +18,8 @@ import { AddFeedback1707213619308 } from './1707213619308-AddFeedback'
import { AddDocumentStore1711637331047 } from './1711637331047-AddDocumentStore'
import { AddLead1710832117612 } from './1710832117612-AddLead'
import { AddLeadToChatMessage1711537986113 } from './1711537986113-AddLeadToChatMessage'
import { AddAgentReasoningToChatMessage1714679514451 } from './1714679514451-AddAgentReasoningToChatMessage'
import { AddTypeToChatFlow1766759476232 } from './1766759476232-AddTypeToChatFlow'
export const sqliteMigrations = [
Init1693835579790,
@@ -32,12 +34,14 @@ export const sqliteMigrations = [
AddUsedToolsToChatMessage1699481607341,
AddCategoryToChatFlow1699900910291,
AddFileAnnotationsToChatMessage1700271021237,
AddFileUploadsToChatMessage1701788586491,
AddVariableEntity1699325775451,
AddFileUploadsToChatMessage1701788586491,
AddSpeechToText1706364937060,
AddUpsertHistoryEntity1709814301358,
AddFeedback1707213619308,
AddDocumentStore1711637331047,
AddLead1710832117612,
AddLeadToChatMessage1711537986113
AddLeadToChatMessage1711537986113,
AddAgentReasoningToChatMessage1714679514451,
AddTypeToChatFlow1766759476232
]
@@ -9,6 +9,7 @@ router.post(['/', '/:id'], chatMessageController.createChatMessage)
router.get(['/', '/:id'], chatMessageController.getAllChatMessages)
// UPDATE
router.put(['/abort/', '/abort/:chatflowid/:chatid'], chatMessageController.abortChatMessage)
// DELETE
router.delete(['/', '/:id'], chatMessageController.removeAllChatMessages)
+1 -1
View File
@@ -46,7 +46,7 @@ const deleteApiKey = async (id: string) => {
}
}
const verifyApiKey = async (paramApiKey: string): Promise<any> => {
const verifyApiKey = async (paramApiKey: string): Promise<string> => {
try {
const apiKey = await getApiKey(paramApiKey)
if (!apiKey) {
@@ -1,4 +1,4 @@
import { FindOptionsWhere } from 'typeorm'
import { DeleteResult, FindOptionsWhere } from 'typeorm'
import { StatusCodes } from 'http-status-codes'
import { chatType, IChatMessage } from '../../Interface'
import { utilGetChatMessage } from '../../utils/getChatMessage'
@@ -36,7 +36,7 @@ const getAllChatMessages = async (
endDate?: string,
messageId?: string,
feedback?: boolean
): Promise<any> => {
): Promise<ChatMessage[]> => {
try {
const dbResponse = await utilGetChatMessage(
chatflowId,
@@ -71,7 +71,7 @@ const getAllInternalChatMessages = async (
endDate?: string,
messageId?: string,
feedback?: boolean
): Promise<any> => {
): Promise<ChatMessage[]> => {
try {
const dbResponse = await utilGetChatMessage(
chatflowId,
@@ -94,7 +94,11 @@ const getAllInternalChatMessages = async (
}
}
const removeAllChatMessages = async (chatId: string, chatflowid: string, deleteOptions: FindOptionsWhere<ChatMessage>): Promise<any> => {
const removeAllChatMessages = async (
chatId: string,
chatflowid: string,
deleteOptions: FindOptionsWhere<ChatMessage>
): Promise<DeleteResult> => {
try {
const appServer = getRunningExpressApp()
@@ -120,9 +124,32 @@ const removeAllChatMessages = async (chatId: string, chatflowid: string, deleteO
}
}
const abortChatMessage = async (chatId: string, chatflowid: string) => {
try {
const appServer = getRunningExpressApp()
const endingNodeData = appServer.chatflowPool.activeChatflows[`${chatflowid}_${chatId}`]?.endingNodeData as any
if (endingNodeData && endingNodeData.signal) {
try {
endingNodeData.signal.abort()
await appServer.chatflowPool.remove(`${chatflowid}_${chatId}`)
} catch (e) {
logger.error(`[server]: Error aborting chat message for ${chatflowid}, chatId ${chatId}: ${e}`)
}
}
} catch (error) {
throw new InternalFlowiseError(
StatusCodes.INTERNAL_SERVER_ERROR,
`Error: chatMessagesService.abortChatMessage - ${getErrorMessage(error)}`
)
}
}
export default {
createChatMessage,
getAllChatMessages,
getAllInternalChatMessages,
removeAllChatMessages
removeAllChatMessages,
abortChatMessage
}
@@ -1,7 +1,7 @@
import { StatusCodes } from 'http-status-codes'
import { InternalFlowiseError } from '../../errors/internalFlowiseError'
import { getRunningExpressApp } from '../../utils/getRunningExpressApp'
import { IChatFlow } from '../../Interface'
import { ChatflowType, IChatFlow } from '../../Interface'
import { ChatFlow } from '../../database/entities/ChatFlow'
import { getAppVersion, getTelemetryFlowObj, isFlowValidForStream, constructGraphs, getEndingNodes } from '../../utils'
import logger from '../../utils/logger'
@@ -47,6 +47,11 @@ const checkIfChatflowIsValidForStreaming = async (chatflowId: string): Promise<a
isStreaming = isFlowValidForStream(nodes, endingNodeData)
}
// If it is a Multi Agents, always enable streaming
if (endingNodes.filter((node) => node.data.category === 'Multi Agents').length > 0) {
return { isStreaming: true }
}
const dbResponse = { isStreaming: isStreaming }
return dbResponse
} catch (error) {
@@ -99,11 +104,14 @@ const deleteChatflow = async (chatflowId: string): Promise<any> => {
}
}
const getAllChatflows = async (): Promise<IChatFlow[]> => {
const getAllChatflows = async (type?: ChatflowType): Promise<IChatFlow[]> => {
try {
const appServer = getRunningExpressApp()
const dbResponse = await appServer.AppDataSource.getRepository(ChatFlow).find()
return dbResponse
if (type === 'MULTIAGENT') {
return dbResponse.filter((chatflow) => chatflow.type === type)
}
return dbResponse.filter((chatflow) => chatflow.type === 'CHATFLOW' || !chatflow.type)
} catch (error) {
throw new InternalFlowiseError(
StatusCodes.INTERNAL_SERVER_ERROR,
@@ -114,6 +122,7 @@ const getAllChatflows = async (): Promise<IChatFlow[]> => {
const getChatflowByApiKey = async (apiKeyId: string): Promise<any> => {
try {
// Here we only get chatflows that are bounded by the apikeyid and chatflows that are not bounded by any apikey
const appServer = getRunningExpressApp()
const dbResponse = await appServer.AppDataSource.getRepository(ChatFlow)
.createQueryBuilder('cf')
@@ -44,6 +44,25 @@ const getAllTemplates = async () => {
}
templates.push(template)
})
marketplaceDir = path.join(__dirname, '..', '..', '..', 'marketplaces', 'agentflows')
jsonsInDir = fs.readdirSync(marketplaceDir).filter((file) => path.extname(file) === '.json')
jsonsInDir.forEach((file, index) => {
const filePath = path.join(__dirname, '..', '..', '..', 'marketplaces', 'agentflows', file)
const fileData = fs.readFileSync(filePath)
const fileDataObj = JSON.parse(fileData.toString())
const template = {
id: index,
templateName: file.split('.json')[0],
flowData: fileData.toString(),
badge: fileDataObj?.badge,
framework: fileDataObj?.framework,
categories: fileDataObj?.categories,
type: 'Agentflow',
description: fileDataObj?.description || ''
}
templates.push(template)
})
const sortedTemplates = templates.sort((a, b) => a.templateName.localeCompare(b.templateName))
const FlowiseDocsQnAIndex = sortedTemplates.findIndex((tmp) => tmp.templateName === 'Flowise Docs QnA')
if (FlowiseDocsQnAIndex > 0) {
@@ -0,0 +1,345 @@
import {
ICommonObject,
IMultiAgentNode,
IAgentReasoning,
ITeamState,
ConsoleCallbackHandler,
additionalCallbacks
} from 'flowise-components'
import { IChatFlow, IComponentNodes, IDepthQueue, IReactFlowNode, IReactFlowObject } from '../Interface'
import { Server } from 'socket.io'
import { buildFlow, getStartingNodes, getEndingNodes, constructGraphs, databaseEntities } from '../utils'
import { getRunningExpressApp } from './getRunningExpressApp'
import logger from './logger'
import { StateGraph, END } from '@langchain/langgraph'
import { BaseMessage, HumanMessage } from '@langchain/core/messages'
import { cloneDeep, flatten } from 'lodash'
import { replaceInputsWithConfig, resolveVariables } from '.'
import { StatusCodes } from 'http-status-codes'
import { InternalFlowiseError } from '../errors/internalFlowiseError'
import { getErrorMessage } from '../errors/utils'
/**
* Build Agent Graph
* @param {IChatFlow} chatflow
* @param {string} chatId
* @param {string} sessionId
* @param {ICommonObject} incomingInput
* @param {string} baseURL
* @param {Server} socketIO
*/
export const buildAgentGraph = async (
chatflow: IChatFlow,
chatId: string,
sessionId: string,
incomingInput: ICommonObject,
baseURL?: string,
socketIO?: Server
): Promise<any> => {
try {
const appServer = getRunningExpressApp()
const chatflowid = chatflow.id
/*** Get chatflows and prepare data ***/
const flowData = chatflow.flowData
const parsedFlowData: IReactFlowObject = JSON.parse(flowData)
const nodes = parsedFlowData.nodes
const edges = parsedFlowData.edges
/*** Get Ending Node with Directed Graph ***/
const { graph, nodeDependencies } = constructGraphs(nodes, edges)
const directedGraph = graph
const endingNodes = getEndingNodes(nodeDependencies, directedGraph, nodes)
/*** Get Starting Nodes with Reversed Graph ***/
const constructedObj = constructGraphs(nodes, edges, { isReversed: true })
const nonDirectedGraph = constructedObj.graph
let startingNodeIds: string[] = []
let depthQueue: IDepthQueue = {}
const endingNodeIds = endingNodes.map((n) => n.id)
for (const endingNodeId of endingNodeIds) {
const resx = getStartingNodes(nonDirectedGraph, endingNodeId)
startingNodeIds.push(...resx.startingNodeIds)
depthQueue = Object.assign(depthQueue, resx.depthQueue)
}
startingNodeIds = [...new Set(startingNodeIds)]
// Initialize nodes like ChatModels, Tools, etc.
const reactFlowNodes = await buildFlow(
startingNodeIds,
nodes,
edges,
graph,
depthQueue,
appServer.nodesPool.componentNodes,
incomingInput.question,
[],
chatId,
sessionId,
chatflowid,
appServer.AppDataSource,
incomingInput?.overrideConfig,
appServer.cachePool,
false,
undefined,
incomingInput.uploads,
baseURL
)
const options = {
chatId,
sessionId,
chatflowid,
logger,
analytic: chatflow.analytic,
appDataSource: appServer.AppDataSource,
databaseEntities: databaseEntities,
cachePool: appServer.cachePool,
uploads: incomingInput.uploads,
baseURL,
signal: new AbortController()
}
let streamResults
let finalResult = ''
let agentReasoning: IAgentReasoning[] = []
const workerNodes: IReactFlowNode[] = reactFlowNodes.filter((node: IReactFlowNode) => node.data.name === 'worker')
const supervisorNodes: IReactFlowNode[] = reactFlowNodes.filter((node: IReactFlowNode) => node.data.name === 'supervisor')
const mapNameToLabel: Record<string, string> = {}
for (const node of [...workerNodes, ...supervisorNodes]) {
mapNameToLabel[node.data.instance.name] = node.data.instance.label
}
try {
streamResults = await compileGraph(
chatflow,
mapNameToLabel,
reactFlowNodes,
endingNodeIds,
appServer.nodesPool.componentNodes,
options,
startingNodeIds,
incomingInput.question,
incomingInput?.overrideConfig
)
if (streamResults) {
let isStreamingStarted = false
for await (const output of await streamResults) {
if (!output?.__end__) {
const agentName = Object.keys(output)[0]
const usedTools = output[agentName]?.messages
? output[agentName].messages.map((msg: any) => msg.additional_kwargs?.usedTools)
: []
const sourceDocuments = output[agentName]?.messages
? output[agentName].messages.map((msg: any) => msg.additional_kwargs?.sourceDocuments)
: []
const messages = output[agentName]?.messages ? output[agentName].messages.map((msg: any) => msg.content) : []
const reasoning = {
agentName: mapNameToLabel[agentName],
messages,
next: output[agentName]?.next,
instructions: output[agentName]?.instructions,
usedTools: flatten(usedTools),
sourceDocuments: flatten(sourceDocuments)
}
agentReasoning.push(reasoning)
if (socketIO && incomingInput.socketIOClientId) {
if (!isStreamingStarted) {
isStreamingStarted = true
socketIO.to(incomingInput.socketIOClientId).emit('start', JSON.stringify(agentReasoning))
}
socketIO.to(incomingInput.socketIOClientId).emit('agentReasoning', JSON.stringify(agentReasoning))
// Send loading next agent indicator
if (reasoning.next && reasoning.next !== 'FINISH' && reasoning.next !== 'END') {
socketIO
.to(incomingInput.socketIOClientId)
.emit('nextAgent', mapNameToLabel[reasoning.next] || reasoning.next)
}
}
} else {
finalResult = output.__end__.messages.length ? output.__end__.messages.pop()?.content : ''
if (Array.isArray(finalResult)) finalResult = output.__end__.instructions
if (finalResult === incomingInput.question) {
const supervisorNode = reactFlowNodes.find((node: IReactFlowNode) => node.data.name === 'supervisor')
const llm = supervisorNode?.data?.instance?.llm
if (llm) {
const res = await llm.invoke(incomingInput.question)
finalResult = res?.content
}
}
if (socketIO && incomingInput.socketIOClientId) {
socketIO.to(incomingInput.socketIOClientId).emit('token', finalResult)
}
}
}
return { finalResult, agentReasoning }
}
} catch (e) {
if (socketIO && incomingInput.socketIOClientId) {
socketIO.to(incomingInput.socketIOClientId).emit('abort')
}
return { finalResult, agentReasoning }
}
return streamResults
} catch (e) {
logger.error('[server]: Error:', e)
throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, `Error buildAgentGraph - ${getErrorMessage(e)}`)
}
}
/**
* Compile Graph
* @param {IChatFlow} chatflow
* @param {Record<string, string>} mapNameToLabel
* @param {IReactFlowNode[]} reactflowNodes
* @param {string[]} workerNodeIds
* @param {IComponentNodes} componentNodes
* @param {ICommonObject} options
* @param {string[]} startingNodeIds
* @param {string} question
* @param {ICommonObject} overrideConfig
*/
const compileGraph = async (
chatflow: IChatFlow,
mapNameToLabel: Record<string, string>,
reactflowNodes: IReactFlowNode[] = [],
workerNodeIds: string[],
componentNodes: IComponentNodes,
options: ICommonObject,
startingNodeIds: string[],
question: string,
overrideConfig?: ICommonObject
) => {
const appServer = getRunningExpressApp()
const channels: ITeamState = {
messages: {
value: (x: BaseMessage[], y: BaseMessage[]) => x.concat(y),
default: () => []
},
next: 'initialState',
instructions: "Solve the user's request.",
team_members: []
}
const workflowGraph = new StateGraph<ITeamState>({
//@ts-ignore
channels
})
const workerNodes = reactflowNodes.filter((node) => workerNodeIds.includes(node.data.id))
let supervisorWorkers: { [key: string]: IMultiAgentNode[] } = {}
// Init worker nodes
for (const workerNode of workerNodes) {
const nodeInstanceFilePath = componentNodes[workerNode.data.name].filePath as string
const nodeModule = await import(nodeInstanceFilePath)
const newNodeInstance = new nodeModule.nodeClass()
let flowNodeData = cloneDeep(workerNode.data)
if (overrideConfig) flowNodeData = replaceInputsWithConfig(flowNodeData, overrideConfig)
flowNodeData = resolveVariables(flowNodeData, reactflowNodes, question, [])
try {
const workerResult: IMultiAgentNode = await newNodeInstance.init(flowNodeData, question, options)
const parentSupervisor = workerResult.parentSupervisorName
if (!parentSupervisor || workerResult.type !== 'worker') continue
if (Object.prototype.hasOwnProperty.call(supervisorWorkers, parentSupervisor)) {
supervisorWorkers[parentSupervisor].push(workerResult)
} else {
supervisorWorkers[parentSupervisor] = [workerResult]
}
workflowGraph.addNode(workerResult.name, workerResult.node)
} catch (e) {
throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, `Error initialize worker nodes - ${getErrorMessage(e)}`)
}
}
// Init supervisor nodes
for (const supervisor in supervisorWorkers) {
const supervisorInputLabel = mapNameToLabel[supervisor]
const supervisorNode = reactflowNodes.find((node) => supervisorInputLabel === node.data.inputs?.supervisorName)
if (!supervisorNode) continue
const nodeInstanceFilePath = componentNodes[supervisorNode.data.name].filePath as string
const nodeModule = await import(nodeInstanceFilePath)
const newNodeInstance = new nodeModule.nodeClass()
let flowNodeData = cloneDeep(supervisorNode.data)
if (overrideConfig) flowNodeData = replaceInputsWithConfig(flowNodeData, overrideConfig)
flowNodeData = resolveVariables(flowNodeData, reactflowNodes, question, [])
if (flowNodeData.inputs) flowNodeData.inputs.workerNodes = supervisorWorkers[supervisor]
try {
const supervisorResult: IMultiAgentNode = await newNodeInstance.init(flowNodeData, question, options)
if (!supervisorResult.workers?.length) continue
if (supervisorResult.moderations && supervisorResult.moderations.length > 0) {
try {
for (const moderation of supervisorResult.moderations) {
question = await moderation.checkForViolations(question)
}
} catch (e) {
throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, getErrorMessage(e))
}
}
workflowGraph.addNode(supervisorResult.name, supervisorResult.node)
for (const worker of supervisorResult.workers) {
workflowGraph.addEdge(worker, supervisorResult.name)
}
let conditionalEdges: { [key: string]: string } = {}
for (let i = 0; i < supervisorResult.workers.length; i++) {
conditionalEdges[supervisorResult.workers[i]] = supervisorResult.workers[i]
}
workflowGraph.addConditionalEdges(supervisorResult.name, (x: ITeamState) => x.next, {
...conditionalEdges,
FINISH: END
})
workflowGraph.setEntryPoint(supervisorResult.name)
// Add agentflow to pool
;(workflowGraph as any).signal = options.signal
appServer.chatflowPool.add(
`${chatflow.id}_${options.chatId}`,
workflowGraph as any,
reactflowNodes.filter((node) => startingNodeIds.includes(node.id)),
overrideConfig
)
// TODO: add persistence
// const memory = new MemorySaver()
const graph = workflowGraph.compile()
const loggerHandler = new ConsoleCallbackHandler(logger)
const callbacks = await additionalCallbacks(flowNodeData, options)
// Return stream result as we should only have 1 supervisor
return await graph.stream(
{
messages: [new HumanMessage({ content: question })]
},
{ recursionLimit: supervisorResult?.recursionLimit ?? 100, callbacks: [loggerHandler, ...callbacks] }
)
} catch (e) {
throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, `Error initialize supervisor nodes - ${getErrorMessage(e)}`)
}
}
}
+127 -14
View File
@@ -1,7 +1,18 @@
import { Request } from 'express'
import { IFileUpload, convertSpeechToText, ICommonObject, addSingleFileToStorage, addArrayFilesToStorage } from 'flowise-components'
import { StatusCodes } from 'http-status-codes'
import { IncomingInput, IMessage, INodeData, IReactFlowObject, IReactFlowNode, IDepthQueue, chatType, IChatMessage } from '../Interface'
import {
IncomingInput,
IMessage,
INodeData,
IReactFlowObject,
IReactFlowNode,
IDepthQueue,
chatType,
IChatMessage,
IChatFlow,
IReactFlowEdge
} from '../Interface'
import { InternalFlowiseError } from '../errors/internalFlowiseError'
import { ChatFlow } from '../database/entities/ChatFlow'
import { Server } from 'socket.io'
@@ -30,6 +41,8 @@ import { omit } from 'lodash'
import * as fs from 'fs'
import logger from './logger'
import { utilAddChatMessage } from './addChatMesage'
import { buildAgentGraph } from './buildAgentGraph'
import { getErrorMessage } from '../errors/utils'
/**
* Build Chatflow
@@ -41,6 +54,8 @@ export const utilBuildChatflow = async (req: Request, socketIO?: Server, isInter
try {
const appServer = getRunningExpressApp()
const chatflowid = req.params.id
const baseURL = `${req.protocol}://${req.get('host')}`
let incomingInput: IncomingInput = req.body
let nodeToExecuteData: INodeData
const chatflow = await appServer.AppDataSource.getRepository(ChatFlow).findOneBy({
@@ -140,11 +155,34 @@ export const utilBuildChatflow = async (req: Request, socketIO?: Server, isInter
const nodes = parsedFlowData.nodes
const edges = parsedFlowData.edges
// Get session ID
/*** Get session ID ***/
const memoryNode = findMemoryNode(nodes, edges)
const memoryType = memoryNode?.data.label
let sessionId = getMemorySessionId(memoryNode, incomingInput, chatId, isInternal)
/*** Get Ending Node with Directed Graph ***/
const { graph, nodeDependencies } = constructGraphs(nodes, edges)
const directedGraph = graph
const endingNodes = getEndingNodes(nodeDependencies, directedGraph, nodes)
/*** If the graph is an agent graph, build the agent response ***/
if (endingNodes.filter((node) => node.data.category === 'Multi Agents').length) {
return await utilBuildAgentResponse(
chatflow,
isInternal,
chatId,
memoryType ?? '',
sessionId,
userMessageDateTime,
fileUploads,
incomingInput,
nodes,
edges,
socketIO,
baseURL
)
}
// Get prepend messages
const prependMessages = incomingInput.history
@@ -153,7 +191,6 @@ export const utilBuildChatflow = async (req: Request, socketIO?: Server, isInter
* - Still in sync (i.e the flow has not been modified since)
* - Existing overrideConfig and new overrideConfig are the same
* - Flow doesn't start with/contain nodes that depend on incomingInput.question
* TODO: convert overrideConfig to hash when we no longer store base64 string but filepath
***/
const isFlowReusable = () => {
return (
@@ -176,13 +213,7 @@ export const utilBuildChatflow = async (req: Request, socketIO?: Server, isInter
`[server]: Reuse existing chatflow ${chatflowid} with ending node ${nodeToExecuteData.label} (${nodeToExecuteData.id})`
)
} else {
/*** Get Ending Node with Directed Graph ***/
const { graph, nodeDependencies } = constructGraphs(nodes, edges)
const directedGraph = graph
const endingNodes = getEndingNodes(nodeDependencies, directedGraph, nodes)
let isCustomFunctionEndingNode = endingNodes.some((node) => node.data?.outputs?.output === 'EndingNode')
const isCustomFunctionEndingNode = endingNodes.some((node) => node.data?.outputs?.output === 'EndingNode')
for (const endingNode of endingNodes) {
const endingNodeData = endingNode.data
@@ -268,7 +299,10 @@ export const utilBuildChatflow = async (req: Request, socketIO?: Server, isInter
appServer.cachePool,
false,
undefined,
incomingInput.uploads
incomingInput.uploads,
baseURL,
socketIO,
incomingInput.socketIOClientId
)
const nodeToExecute =
@@ -372,13 +406,92 @@ export const utilBuildChatflow = async (req: Request, socketIO?: Server, isInter
// this is used when input text is empty but question is in audio format
result.question = incomingInput.question
result.chatId = chatId
result.chatMessageId = chatMessage.id
result.chatMessageId = chatMessage?.id
if (sessionId) result.sessionId = sessionId
if (memoryType) result.memoryType = memoryType
return result
} catch (e: any) {
} catch (e) {
logger.error('[server]: Error:', e)
throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, e.message)
throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, getErrorMessage(e))
}
}
const utilBuildAgentResponse = async (
chatflow: IChatFlow,
isInternal: boolean,
chatId: string,
memoryType: string,
sessionId: string,
userMessageDateTime: Date,
fileUploads: IFileUpload[],
incomingInput: ICommonObject,
nodes: IReactFlowNode[],
edges: IReactFlowEdge[],
socketIO?: Server,
baseURL?: string
) => {
try {
const appServer = getRunningExpressApp()
const streamResults = await buildAgentGraph(chatflow, chatId, sessionId, incomingInput, baseURL, socketIO)
if (streamResults) {
const { finalResult, agentReasoning } = streamResults
const userMessage: Omit<IChatMessage, 'id'> = {
role: 'userMessage',
content: incomingInput.question,
chatflowid: chatflow.id,
chatType: isInternal ? chatType.INTERNAL : chatType.EXTERNAL,
chatId,
memoryType,
sessionId,
createdDate: userMessageDateTime,
fileUploads: incomingInput.uploads ? JSON.stringify(fileUploads) : undefined,
leadEmail: incomingInput.leadEmail
}
await utilAddChatMessage(userMessage)
const apiMessage: Omit<IChatMessage, 'id' | 'createdDate'> = {
role: 'apiMessage',
content: finalResult,
chatflowid: chatflow.id,
chatType: isInternal ? chatType.INTERNAL : chatType.EXTERNAL,
chatId,
memoryType,
sessionId
}
if (agentReasoning.length) apiMessage.agentReasoning = JSON.stringify(agentReasoning)
const chatMessage = await utilAddChatMessage(apiMessage)
await appServer.telemetry.sendTelemetry('prediction_sent', {
version: await getAppVersion(),
chatlowId: chatflow.id,
chatId,
type: isInternal ? chatType.INTERNAL : chatType.EXTERNAL,
flowGraph: getTelemetryFlowObj(nodes, edges)
})
// Prepare response
let result: ICommonObject = {}
result.text = finalResult
result.question = incomingInput.question
result.chatId = chatId
result.chatMessageId = chatMessage?.id
if (sessionId) result.sessionId = sessionId
if (memoryType) result.memoryType = memoryType
if (agentReasoning.length) result.agentReasoning = agentReasoning
await appServer.telemetry.sendTelemetry('graph_compiled', {
version: await getAppVersion(),
graphId: chatflow.id,
type: isInternal ? chatType.INTERNAL : chatType.EXTERNAL,
flowGraph: getTelemetryFlowObj(nodes, edges)
})
return result
}
return undefined
} catch (e) {
logger.error('[server]: Error:', e)
throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, getErrorMessage(e))
}
}
@@ -18,7 +18,7 @@ export const utilGetUploadsConfig = async (chatflowid: string): Promise<any> =>
throw new InternalFlowiseError(StatusCodes.NOT_FOUND, `Chatflow ${chatflowid} not found`)
}
const uploadAllowedNodes = ['llmChain', 'conversationChain', 'reactAgentChat', 'conversationalAgent', 'toolAgent']
const uploadAllowedNodes = ['llmChain', 'conversationChain', 'reactAgentChat', 'conversationalAgent', 'toolAgent', 'supervisor']
const uploadProcessingNodes = ['chatOpenAI', 'chatAnthropic', 'awsChatBedrock', 'azureChatOpenAI', 'chatGoogleGenerativeAI']
const flowObj = JSON.parse(chatflow.flowData)
+16 -6
View File
@@ -1,6 +1,7 @@
import path from 'path'
import fs from 'fs'
import logger from './logger'
import { Server } from 'socket.io'
import {
IComponentCredentials,
IComponentNodes,
@@ -267,9 +268,10 @@ export const getEndingNodes = (
endingNodeData &&
endingNodeData.category !== 'Chains' &&
endingNodeData.category !== 'Agents' &&
endingNodeData.category !== 'Engine'
endingNodeData.category !== 'Engine' &&
endingNodeData.category !== 'Multi Agents'
) {
error = new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, `Ending node must be either a Chain or Agent`)
error = new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, `Ending node must be either a Chain or Agent or Engine`)
continue
}
}
@@ -443,7 +445,10 @@ export const buildFlow = async (
cachePool?: CachePool,
isUpsert?: boolean,
stopNodeId?: string,
uploads?: IFileUpload[]
uploads?: IFileUpload[],
baseURL?: string,
socketIO?: Server,
socketIOClientId?: string
) => {
const flowNodes = cloneDeep(reactFlowNodes)
@@ -496,7 +501,10 @@ export const buildFlow = async (
databaseEntities,
cachePool,
dynamicVariables,
uploads
uploads,
baseURL,
socketIO,
socketIOClientId
})
if (indexResult) upsertHistory['result'] = indexResult
logger.debug(`[server]: Finished upserting ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
@@ -520,7 +528,10 @@ export const buildFlow = async (
cachePool,
isUpsert,
dynamicVariables,
uploads
uploads,
baseURL,
socketIO,
socketIOClientId
})
// Save dynamic variables
@@ -1048,7 +1059,6 @@ export const findAvailableConfigs = (reactFlowNodes: IReactFlowNode[], component
}
}
}
return configs
}