mirror of
https://github.com/farcasclaudiu/Flowise.git
synced 2026-06-28 17:01:00 +03:00
feat: Add structured JSON output support to Agent Node (#5470)
* feat: Add structured JSON output support to Agent Node - Add agentStructuredOutput input parameter matching LLM Node structure - Implement configureStructuredOutput method to convert schema to Zod - Add createZodSchemaFromJSON helper for complex JSON schemas - Configure structured output before binding tools (required order) - Disable streaming when structured output is enabled - Extract structured fields in prepareOutputObject method - Resolves issue #5256 * lint fix * add structured output to Agent node * add structured output to Agent node --------- Co-authored-by: Henry <hzj94@hotmail.com>
This commit is contained in:
committed by
GitHub
parent
4d79653741
commit
1f3f7a7194
@@ -28,7 +28,13 @@ import {
|
||||
replaceBase64ImagesWithFileReferences,
|
||||
updateFlowState
|
||||
} from '../utils'
|
||||
import { convertMultiOptionsToStringArray, getCredentialData, getCredentialParam, processTemplateVariables } from '../../../src/utils'
|
||||
import {
|
||||
convertMultiOptionsToStringArray,
|
||||
getCredentialData,
|
||||
getCredentialParam,
|
||||
processTemplateVariables,
|
||||
configureStructuredOutput
|
||||
} from '../../../src/utils'
|
||||
import { addSingleFileToStorage } from '../../../src/storageUtils'
|
||||
import fetch from 'node-fetch'
|
||||
|
||||
@@ -394,6 +400,108 @@ class Agent_Agentflow implements INode {
|
||||
],
|
||||
default: 'userMessage'
|
||||
},
|
||||
{
|
||||
label: 'JSON Structured Output',
|
||||
name: 'agentStructuredOutput',
|
||||
description: 'Instruct the Agent to give output in a JSON structured schema',
|
||||
type: 'array',
|
||||
optional: true,
|
||||
acceptVariable: true,
|
||||
array: [
|
||||
{
|
||||
label: 'Key',
|
||||
name: 'key',
|
||||
type: 'string'
|
||||
},
|
||||
{
|
||||
label: 'Type',
|
||||
name: 'type',
|
||||
type: 'options',
|
||||
options: [
|
||||
{
|
||||
label: 'String',
|
||||
name: 'string'
|
||||
},
|
||||
{
|
||||
label: 'String Array',
|
||||
name: 'stringArray'
|
||||
},
|
||||
{
|
||||
label: 'Number',
|
||||
name: 'number'
|
||||
},
|
||||
{
|
||||
label: 'Boolean',
|
||||
name: 'boolean'
|
||||
},
|
||||
{
|
||||
label: 'Enum',
|
||||
name: 'enum'
|
||||
},
|
||||
{
|
||||
label: 'JSON Array',
|
||||
name: 'jsonArray'
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
label: 'Enum Values',
|
||||
name: 'enumValues',
|
||||
type: 'string',
|
||||
placeholder: 'value1, value2, value3',
|
||||
description: 'Enum values. Separated by comma',
|
||||
optional: true,
|
||||
show: {
|
||||
'agentStructuredOutput[$index].type': 'enum'
|
||||
}
|
||||
},
|
||||
{
|
||||
label: 'JSON Schema',
|
||||
name: 'jsonSchema',
|
||||
type: 'code',
|
||||
placeholder: `{
|
||||
"answer": {
|
||||
"type": "string",
|
||||
"description": "Value of the answer"
|
||||
},
|
||||
"reason": {
|
||||
"type": "string",
|
||||
"description": "Reason for the answer"
|
||||
},
|
||||
"optional": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"count": {
|
||||
"type": "number"
|
||||
},
|
||||
"children": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"value": {
|
||||
"type": "string",
|
||||
"description": "Value of the children's answer"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}`,
|
||||
description: 'JSON schema for the structured output',
|
||||
optional: true,
|
||||
hideCodeExecute: true,
|
||||
show: {
|
||||
'agentStructuredOutput[$index].type': 'jsonArray'
|
||||
}
|
||||
},
|
||||
{
|
||||
label: 'Description',
|
||||
name: 'description',
|
||||
type: 'string',
|
||||
placeholder: 'Description of the key'
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
label: 'Update Flow State',
|
||||
name: 'agentUpdateState',
|
||||
@@ -770,6 +878,7 @@ class Agent_Agentflow implements INode {
|
||||
const memoryType = nodeData.inputs?.agentMemoryType as string
|
||||
const userMessage = nodeData.inputs?.agentUserMessage as string
|
||||
const _agentUpdateState = nodeData.inputs?.agentUpdateState
|
||||
const _agentStructuredOutput = nodeData.inputs?.agentStructuredOutput
|
||||
const agentMessages = (nodeData.inputs?.agentMessages as unknown as ILLMMessage[]) ?? []
|
||||
|
||||
// Extract runtime state and history
|
||||
@@ -795,6 +904,8 @@ class Agent_Agentflow implements INode {
|
||||
const llmWithoutToolsBind = (await newLLMNodeInstance.init(newNodeData, '', options)) as BaseChatModel
|
||||
let llmNodeInstance = llmWithoutToolsBind
|
||||
|
||||
const isStructuredOutput = _agentStructuredOutput && Array.isArray(_agentStructuredOutput) && _agentStructuredOutput.length > 0
|
||||
|
||||
const agentToolsBuiltInOpenAI = convertMultiOptionsToStringArray(nodeData.inputs?.agentToolsBuiltInOpenAI)
|
||||
if (agentToolsBuiltInOpenAI && agentToolsBuiltInOpenAI.length > 0) {
|
||||
for (const tool of agentToolsBuiltInOpenAI) {
|
||||
@@ -953,7 +1064,7 @@ class Agent_Agentflow implements INode {
|
||||
// Initialize response and determine if streaming is possible
|
||||
let response: AIMessageChunk = new AIMessageChunk('')
|
||||
const isLastNode = options.isLastNode as boolean
|
||||
const isStreamable = isLastNode && options.sseStreamer !== undefined && modelConfig?.streaming !== false
|
||||
const isStreamable = isLastNode && options.sseStreamer !== undefined && modelConfig?.streaming !== false && !isStructuredOutput
|
||||
|
||||
// Start analytics
|
||||
if (analyticHandlers && options.parentTraceIds) {
|
||||
@@ -1002,7 +1113,8 @@ class Agent_Agentflow implements INode {
|
||||
llmWithoutToolsBind,
|
||||
isStreamable,
|
||||
isLastNode,
|
||||
iterationContext
|
||||
iterationContext,
|
||||
isStructuredOutput
|
||||
})
|
||||
|
||||
response = result.response
|
||||
@@ -1031,7 +1143,14 @@ class Agent_Agentflow implements INode {
|
||||
}
|
||||
} else {
|
||||
if (isStreamable) {
|
||||
response = await this.handleStreamingResponse(sseStreamer, llmNodeInstance, messages, chatId, abortController)
|
||||
response = await this.handleStreamingResponse(
|
||||
sseStreamer,
|
||||
llmNodeInstance,
|
||||
messages,
|
||||
chatId,
|
||||
abortController,
|
||||
isStructuredOutput
|
||||
)
|
||||
} else {
|
||||
response = await llmNodeInstance.invoke(messages, { signal: abortController?.signal })
|
||||
}
|
||||
@@ -1053,7 +1172,8 @@ class Agent_Agentflow implements INode {
|
||||
llmNodeInstance,
|
||||
isStreamable,
|
||||
isLastNode,
|
||||
iterationContext
|
||||
iterationContext,
|
||||
isStructuredOutput
|
||||
})
|
||||
|
||||
response = result.response
|
||||
@@ -1080,8 +1200,9 @@ class Agent_Agentflow implements INode {
|
||||
sseStreamer.streamArtifactsEvent(chatId, flatten(artifacts))
|
||||
}
|
||||
}
|
||||
} else if (!humanInput && !isStreamable && isLastNode && sseStreamer) {
|
||||
} else if (!humanInput && !isStreamable && isLastNode && sseStreamer && !isStructuredOutput) {
|
||||
// Stream whole response back to UI if not streaming and no tool calls
|
||||
// Skip this if structured output is enabled - it will be streamed after conversion
|
||||
let finalResponse = ''
|
||||
if (response.content && Array.isArray(response.content)) {
|
||||
finalResponse = response.content.map((item: any) => item.text).join('\n')
|
||||
@@ -1159,6 +1280,23 @@ class Agent_Agentflow implements INode {
|
||||
finalResponse = await this.processSandboxLinks(finalResponse, options.baseURL, options.chatflowid, chatId)
|
||||
}
|
||||
|
||||
// If is structured output, then invoke LLM again with structured output at the very end after all tool calls
|
||||
if (isStructuredOutput) {
|
||||
llmNodeInstance = configureStructuredOutput(llmNodeInstance, _agentStructuredOutput)
|
||||
const prompt = 'Convert the following response to the structured output format: ' + finalResponse
|
||||
response = await llmNodeInstance.invoke(prompt, { signal: abortController?.signal })
|
||||
|
||||
if (typeof response === 'object') {
|
||||
finalResponse = '```json\n' + JSON.stringify(response, null, 2) + '\n```'
|
||||
} else {
|
||||
finalResponse = response
|
||||
}
|
||||
|
||||
if (isLastNode && sseStreamer) {
|
||||
sseStreamer.streamTokenEvent(chatId, finalResponse)
|
||||
}
|
||||
}
|
||||
|
||||
const output = this.prepareOutputObject(
|
||||
response,
|
||||
availableTools,
|
||||
@@ -1171,7 +1309,8 @@ class Agent_Agentflow implements INode {
|
||||
artifacts,
|
||||
additionalTokens,
|
||||
isWaitingForHumanInput,
|
||||
fileAnnotations
|
||||
fileAnnotations,
|
||||
isStructuredOutput
|
||||
)
|
||||
|
||||
// End analytics tracking
|
||||
@@ -1561,13 +1700,14 @@ class Agent_Agentflow implements INode {
|
||||
llmNodeInstance: BaseChatModel,
|
||||
messages: BaseMessageLike[],
|
||||
chatId: string,
|
||||
abortController: AbortController
|
||||
abortController: AbortController,
|
||||
isStructuredOutput: boolean = false
|
||||
): Promise<AIMessageChunk> {
|
||||
let response = new AIMessageChunk('')
|
||||
|
||||
try {
|
||||
for await (const chunk of await llmNodeInstance.stream(messages, { signal: abortController?.signal })) {
|
||||
if (sseStreamer) {
|
||||
if (sseStreamer && !isStructuredOutput) {
|
||||
let content = ''
|
||||
|
||||
if (typeof chunk === 'string') {
|
||||
@@ -1610,7 +1750,8 @@ class Agent_Agentflow implements INode {
|
||||
artifacts: any[],
|
||||
additionalTokens: number = 0,
|
||||
isWaitingForHumanInput: boolean = false,
|
||||
fileAnnotations: any[] = []
|
||||
fileAnnotations: any[] = [],
|
||||
isStructuredOutput: boolean = false
|
||||
): any {
|
||||
const output: any = {
|
||||
content: finalResponse,
|
||||
@@ -1645,6 +1786,15 @@ class Agent_Agentflow implements INode {
|
||||
output.responseMetadata = response.response_metadata
|
||||
}
|
||||
|
||||
if (isStructuredOutput && typeof response === 'object') {
|
||||
const structuredOutput = response as Record<string, any>
|
||||
for (const key in structuredOutput) {
|
||||
if (structuredOutput[key] !== undefined && structuredOutput[key] !== null) {
|
||||
output[key] = structuredOutput[key]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add used tools, source documents and artifacts to output
|
||||
if (usedTools && usedTools.length > 0) {
|
||||
output.usedTools = flatten(usedTools)
|
||||
@@ -1710,7 +1860,8 @@ class Agent_Agentflow implements INode {
|
||||
llmNodeInstance,
|
||||
isStreamable,
|
||||
isLastNode,
|
||||
iterationContext
|
||||
iterationContext,
|
||||
isStructuredOutput = false
|
||||
}: {
|
||||
response: AIMessageChunk
|
||||
messages: BaseMessageLike[]
|
||||
@@ -1724,6 +1875,7 @@ class Agent_Agentflow implements INode {
|
||||
isStreamable: boolean
|
||||
isLastNode: boolean
|
||||
iterationContext: ICommonObject
|
||||
isStructuredOutput?: boolean
|
||||
}): Promise<{
|
||||
response: AIMessageChunk
|
||||
usedTools: IUsedTool[]
|
||||
@@ -1803,7 +1955,9 @@ class Agent_Agentflow implements INode {
|
||||
const toolCallDetails = '```json\n' + JSON.stringify(toolCall, null, 2) + '\n```'
|
||||
const responseContent = response.content + `\nAttempting to use tool:\n${toolCallDetails}`
|
||||
response.content = responseContent
|
||||
sseStreamer?.streamTokenEvent(chatId, responseContent)
|
||||
if (!isStructuredOutput) {
|
||||
sseStreamer?.streamTokenEvent(chatId, responseContent)
|
||||
}
|
||||
return { response, usedTools, sourceDocuments, artifacts, totalTokens, isWaitingForHumanInput: true }
|
||||
}
|
||||
|
||||
@@ -1909,7 +2063,7 @@ class Agent_Agentflow implements INode {
|
||||
const lastToolOutput = usedTools[0]?.toolOutput || ''
|
||||
const lastToolOutputString = typeof lastToolOutput === 'string' ? lastToolOutput : JSON.stringify(lastToolOutput, null, 2)
|
||||
|
||||
if (sseStreamer) {
|
||||
if (sseStreamer && !isStructuredOutput) {
|
||||
sseStreamer.streamTokenEvent(chatId, lastToolOutputString)
|
||||
}
|
||||
|
||||
@@ -1938,12 +2092,19 @@ class Agent_Agentflow implements INode {
|
||||
let newResponse: AIMessageChunk
|
||||
|
||||
if (isStreamable) {
|
||||
newResponse = await this.handleStreamingResponse(sseStreamer, llmNodeInstance, messages, chatId, abortController)
|
||||
newResponse = await this.handleStreamingResponse(
|
||||
sseStreamer,
|
||||
llmNodeInstance,
|
||||
messages,
|
||||
chatId,
|
||||
abortController,
|
||||
isStructuredOutput
|
||||
)
|
||||
} else {
|
||||
newResponse = await llmNodeInstance.invoke(messages, { signal: abortController?.signal })
|
||||
|
||||
// Stream non-streaming response if this is the last node
|
||||
if (isLastNode && sseStreamer) {
|
||||
if (isLastNode && sseStreamer && !isStructuredOutput) {
|
||||
let responseContent = JSON.stringify(newResponse, null, 2)
|
||||
if (typeof newResponse.content === 'string') {
|
||||
responseContent = newResponse.content
|
||||
@@ -1978,7 +2139,8 @@ class Agent_Agentflow implements INode {
|
||||
llmNodeInstance,
|
||||
isStreamable,
|
||||
isLastNode,
|
||||
iterationContext
|
||||
iterationContext,
|
||||
isStructuredOutput
|
||||
})
|
||||
|
||||
// Merge results from recursive tool calls
|
||||
@@ -2009,7 +2171,8 @@ class Agent_Agentflow implements INode {
|
||||
llmWithoutToolsBind,
|
||||
isStreamable,
|
||||
isLastNode,
|
||||
iterationContext
|
||||
iterationContext,
|
||||
isStructuredOutput = false
|
||||
}: {
|
||||
humanInput: IHumanInput
|
||||
humanInputAction: Record<string, any> | undefined
|
||||
@@ -2024,6 +2187,7 @@ class Agent_Agentflow implements INode {
|
||||
isStreamable: boolean
|
||||
isLastNode: boolean
|
||||
iterationContext: ICommonObject
|
||||
isStructuredOutput?: boolean
|
||||
}): Promise<{
|
||||
response: AIMessageChunk
|
||||
usedTools: IUsedTool[]
|
||||
@@ -2226,7 +2390,7 @@ class Agent_Agentflow implements INode {
|
||||
const lastToolOutput = usedTools[0]?.toolOutput || ''
|
||||
const lastToolOutputString = typeof lastToolOutput === 'string' ? lastToolOutput : JSON.stringify(lastToolOutput, null, 2)
|
||||
|
||||
if (sseStreamer) {
|
||||
if (sseStreamer && !isStructuredOutput) {
|
||||
sseStreamer.streamTokenEvent(chatId, lastToolOutputString)
|
||||
}
|
||||
|
||||
@@ -2257,12 +2421,19 @@ class Agent_Agentflow implements INode {
|
||||
}
|
||||
|
||||
if (isStreamable) {
|
||||
newResponse = await this.handleStreamingResponse(sseStreamer, llmNodeInstance, messages, chatId, abortController)
|
||||
newResponse = await this.handleStreamingResponse(
|
||||
sseStreamer,
|
||||
llmNodeInstance,
|
||||
messages,
|
||||
chatId,
|
||||
abortController,
|
||||
isStructuredOutput
|
||||
)
|
||||
} else {
|
||||
newResponse = await llmNodeInstance.invoke(messages, { signal: abortController?.signal })
|
||||
|
||||
// Stream non-streaming response if this is the last node
|
||||
if (isLastNode && sseStreamer) {
|
||||
if (isLastNode && sseStreamer && !isStructuredOutput) {
|
||||
let responseContent = JSON.stringify(newResponse, null, 2)
|
||||
if (typeof newResponse.content === 'string') {
|
||||
responseContent = newResponse.content
|
||||
@@ -2297,7 +2468,8 @@ class Agent_Agentflow implements INode {
|
||||
llmNodeInstance,
|
||||
isStreamable,
|
||||
isLastNode,
|
||||
iterationContext
|
||||
iterationContext,
|
||||
isStructuredOutput
|
||||
})
|
||||
|
||||
// Merge results from recursive tool calls
|
||||
|
||||
Reference in New Issue
Block a user