Chore/API for AgentflowV2 (#4696)

* Enhancement: Introduce prepended chat history handling in Agent and LLM nodes.

- Added support for `prependedChatHistory` in both `Agent` and `LLM` classes to allow for initial message context.
- Implemented validation for history schema in execution flow to ensure proper format.
- Refactored utility functions to include JSON sanitization and validation methods for improved data handling.

* update prediction swagger
This commit is contained in:
Henry Heng
2025-06-22 13:16:35 +01:00
committed by GitHub
parent 035b5555a9
commit 543800562e
9 changed files with 426 additions and 89 deletions
+87 -26
View File
@@ -41,7 +41,9 @@ import {
getStartingNode,
getTelemetryFlowObj,
QUESTION_VAR_PREFIX,
CURRENT_DATE_TIME_VAR_PREFIX
CURRENT_DATE_TIME_VAR_PREFIX,
_removeCredentialId,
validateHistorySchema
} from '.'
import { ChatFlow } from '../database/entities/ChatFlow'
import { Variable } from '../database/entities/Variable'
@@ -105,6 +107,7 @@ interface IExecuteNodeParams {
evaluationRunId?: string
isInternal: boolean
pastChatHistory: IMessage[]
prependedChatHistory: IMessage[]
appDataSource: DataSource
usageCacheManager: UsageCacheManager
telemetry: Telemetry
@@ -203,21 +206,6 @@ const updateExecution = async (appDataSource: DataSource, executionId: string, w
await appDataSource.getRepository(Execution).save(execution)
}
export const _removeCredentialId = (obj: any): any => {
if (!obj || typeof obj !== 'object') return obj
if (Array.isArray(obj)) {
return obj.map((item) => _removeCredentialId(item))
}
const newObj: Record<string, any> = {}
for (const [key, value] of Object.entries(obj)) {
if (key === 'FLOWISE_CREDENTIAL_ID') continue
newObj[key] = _removeCredentialId(value)
}
return newObj
}
export const resolveVariables = async (
reactFlowNodeData: INodeData,
question: string,
@@ -820,6 +808,7 @@ const executeNode = async ({
evaluationRunId,
parentExecutionId,
pastChatHistory,
prependedChatHistory,
appDataSource,
usageCacheManager,
telemetry,
@@ -927,6 +916,7 @@ const executeNode = async ({
humanInputAction = lastNodeOutput?.humanInputAction
}
// This is when human in the loop is resumed
if (humanInput && nodeId === humanInput.startNodeId) {
reactFlowNodeData.inputs = { ...reactFlowNodeData.inputs, humanInput }
// Remove the stopped humanInput from execution data
@@ -973,6 +963,7 @@ const executeNode = async ({
isLastNode,
sseStreamer,
pastChatHistory,
prependedChatHistory,
agentflowRuntime,
abortController,
analyticHandlers,
@@ -1297,6 +1288,17 @@ export const executeAgentFlow = async ({
const chatflowid = chatflow.id
const sessionId = incomingInput.sessionId ?? chatId
const humanInput: IHumanInput | undefined = incomingInput.humanInput
// Validate history schema if provided
if (incomingInput.history && incomingInput.history.length > 0) {
if (!validateHistorySchema(incomingInput.history)) {
throw new Error(
'Invalid history format. Each history item must have: ' + '{ role: "apiMessage" | "userMessage", content: string }'
)
}
}
const prependedChatHistory = incomingInput.history ?? []
const apiMessageId = uuidv4()
/*** Get chatflows and prepare data ***/
@@ -1413,35 +1415,90 @@ export const executeAgentFlow = async ({
}
// If it is human input, find the last checkpoint and resume
if (humanInput?.startNodeId) {
if (humanInput) {
if (!previousExecution) {
throw new Error(`No previous execution found for session ${sessionId}`)
}
if (previousExecution.state !== 'STOPPED') {
let executionData = JSON.parse(previousExecution.executionData) as IAgentflowExecutedData[]
let shouldUpdateExecution = false
// Handle different execution states
if (previousExecution.state === 'STOPPED') {
// Normal case - execution is stopped and ready to resume
logger.debug(` ✅ Previous execution is in STOPPED state, ready to resume`)
} else if (previousExecution.state === 'ERROR') {
// Check if second-to-last execution item is STOPPED and last is ERROR
if (executionData.length >= 2) {
const lastItem = executionData[executionData.length - 1]
const secondLastItem = executionData[executionData.length - 2]
if (lastItem.status === 'ERROR' && secondLastItem.status === 'STOPPED') {
logger.debug(` 🔄 Found ERROR after STOPPED - removing last error item to allow retry`)
logger.debug(` Removing: ${lastItem.nodeId} (${lastItem.nodeLabel}) - ${lastItem.data?.error || 'Unknown error'}`)
// Remove the last ERROR item
executionData = executionData.slice(0, -1)
shouldUpdateExecution = true
} else {
throw new Error(
`Cannot resume execution ${previousExecution.id} because it is in 'ERROR' state ` +
`and the previous item is not in 'STOPPED' state. Only executions that ended with a ` +
`STOPPED state (or ERROR after STOPPED) can be resumed.`
)
}
} else {
throw new Error(
`Cannot resume execution ${previousExecution.id} because it is in 'ERROR' state ` +
`with insufficient execution data. Only executions in 'STOPPED' state can be resumed.`
)
}
} else {
throw new Error(
`Cannot resume execution ${previousExecution.id} because it is in '${previousExecution.state}' state. ` +
`Only executions in 'STOPPED' state can be resumed.`
`Only executions in 'STOPPED' state (or 'ERROR' after 'STOPPED') can be resumed.`
)
}
startingNodeIds.push(humanInput.startNodeId)
checkForMultipleStartNodes(startingNodeIds, isRecursive, nodes)
let startNodeId = humanInput.startNodeId
const executionData = JSON.parse(previousExecution.executionData) as IAgentflowExecutedData[]
// If startNodeId is not provided, find the last node with STOPPED status from execution data
if (!startNodeId) {
// Search in reverse order to find the last (most recent) STOPPED node
const stoppedNode = [...executionData].reverse().find((data) => data.status === 'STOPPED')
// Verify that the humanInputAgentflow node exists in previous execution
const humanInputNodeExists = executionData.some((data) => data.nodeId === humanInput.startNodeId)
if (!stoppedNode) {
throw new Error('No stopped node found in previous execution data to resume from')
}
if (!humanInputNodeExists) {
startNodeId = stoppedNode.nodeId
logger.debug(` 🔍 Auto-detected stopped node to resume from: ${startNodeId} (${stoppedNode.nodeLabel})`)
}
// Verify that the node exists in previous execution
const nodeExists = executionData.some((data) => data.nodeId === startNodeId)
if (!nodeExists) {
throw new Error(
`Human Input node ${humanInput.startNodeId} not found in previous execution. ` +
`Node ${startNodeId} not found in previous execution. ` +
`This could indicate an invalid resume attempt or a modified flow.`
)
}
startingNodeIds.push(startNodeId)
checkForMultipleStartNodes(startingNodeIds, isRecursive, nodes)
agentFlowExecutedData.push(...executionData)
// Update execution data if we removed an error item
if (shouldUpdateExecution) {
logger.debug(` 📝 Updating execution data after removing error item`)
await updateExecution(appDataSource, previousExecution.id, workspaceId, {
executionData: JSON.stringify(executionData),
state: 'INPROGRESS'
})
}
// Get last state
const lastState = executionData[executionData.length - 1].data.state
@@ -1454,6 +1511,9 @@ export const executeAgentFlow = async ({
})
newExecution = previousExecution
parentExecutionId = previousExecution.id
// Update humanInput with the resolved startNodeId
humanInput.startNodeId = startNodeId
} else if (isRecursive && parentExecutionId) {
const { startingNodeIds: startingNodeIdsFromFlow } = getStartingNode(nodeDependencies)
startingNodeIds.push(...startingNodeIdsFromFlow)
@@ -1604,6 +1664,7 @@ export const executeAgentFlow = async ({
parentExecutionId,
isInternal,
pastChatHistory,
prependedChatHistory,
appDataSource,
usageCacheManager,
telemetry,