mirror of
https://github.com/farcasclaudiu/Flowise.git
synced 2026-06-28 19:00:59 +03:00
Several features for OpenAPI toolkit and OpenAI Assistants (#3989)
* Allows 'x-strict' attribute in OpenAPI spec tool and other json spec objects, this allows the OpenAI Assistant to have function calls with 'strict' mode. Also allows the OpenAI assistant to call several tools in the same run. And adds a checkbox 'remove Nulls' for the OpenAPI toolkit so that parameters with null values are not passed to the backend api. * fix lint errors --------- Co-authored-by: Olivier Schiavo <olivier.schiavo@wengo.com>
This commit is contained in:
@@ -18,6 +18,7 @@ import { AnalyticHandler } from '../../../src/handler'
|
||||
import { Moderation, checkInputs, streamResponse } from '../../moderation/Moderation'
|
||||
import { formatResponse } from '../../outputparsers/OutputParserHelpers'
|
||||
import { addSingleFileToStorage } from '../../../src/storageUtils'
|
||||
import { DynamicStructuredTool } from '../../tools/OpenAPIToolkit/core'
|
||||
|
||||
const lenticularBracketRegex = /【[^】]*】/g
|
||||
const imageRegex = /<img[^>]*\/>/g
|
||||
@@ -504,7 +505,6 @@ class OpenAIAssistant_Agents implements INode {
|
||||
toolCallId: item.id
|
||||
})
|
||||
})
|
||||
|
||||
const submitToolOutputs = []
|
||||
for (let i = 0; i < actions.length; i += 1) {
|
||||
const tool = tools.find((tool: any) => tool.name === actions[i].tool)
|
||||
@@ -539,30 +539,23 @@ class OpenAIAssistant_Agents implements INode {
|
||||
}
|
||||
|
||||
try {
|
||||
const stream = openai.beta.threads.runs.submitToolOutputsStream(threadId, runThreadId, {
|
||||
tool_outputs: submitToolOutputs
|
||||
await handleToolSubmission({
|
||||
openai,
|
||||
threadId,
|
||||
runThreadId,
|
||||
submitToolOutputs,
|
||||
tools,
|
||||
analyticHandlers,
|
||||
parentIds,
|
||||
llmIds,
|
||||
sseStreamer,
|
||||
chatId,
|
||||
options,
|
||||
input,
|
||||
usedTools,
|
||||
text,
|
||||
isStreamingStarted
|
||||
})
|
||||
|
||||
for await (const event of stream) {
|
||||
if (event.event === 'thread.message.delta') {
|
||||
const chunk = event.data.delta.content?.[0]
|
||||
if (chunk && 'text' in chunk && chunk.text?.value) {
|
||||
text += chunk.text.value
|
||||
if (!isStreamingStarted) {
|
||||
isStreamingStarted = true
|
||||
if (sseStreamer) {
|
||||
sseStreamer.streamStartEvent(chatId, chunk.text.value)
|
||||
}
|
||||
}
|
||||
if (sseStreamer) {
|
||||
sseStreamer.streamTokenEvent(chatId, chunk.text.value)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (sseStreamer) {
|
||||
sseStreamer.streamUsedToolsEvent(chatId, usedTools)
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('Error submitting tool outputs:', error)
|
||||
await openai.beta.threads.runs.cancel(threadId, runThreadId)
|
||||
@@ -634,7 +627,6 @@ class OpenAIAssistant_Agents implements INode {
|
||||
toolCallId: item.id
|
||||
})
|
||||
})
|
||||
|
||||
const submitToolOutputs = []
|
||||
for (let i = 0; i < actions.length; i += 1) {
|
||||
const tool = tools.find((tool: any) => tool.name === actions[i].tool)
|
||||
@@ -895,15 +887,212 @@ const downloadFile = async (openAIApiKey: string, fileObj: any, fileName: string
|
||||
}
|
||||
}
|
||||
|
||||
interface ToolSubmissionParams {
|
||||
openai: OpenAI
|
||||
threadId: string
|
||||
runThreadId: string
|
||||
submitToolOutputs: any[]
|
||||
tools: any[]
|
||||
analyticHandlers: AnalyticHandler
|
||||
parentIds: ICommonObject
|
||||
llmIds: ICommonObject
|
||||
sseStreamer: IServerSideEventStreamer
|
||||
chatId: string
|
||||
options: ICommonObject
|
||||
input: string
|
||||
usedTools: IUsedTool[]
|
||||
text: string
|
||||
isStreamingStarted: boolean
|
||||
}
|
||||
|
||||
interface ToolSubmissionResult {
|
||||
text: string
|
||||
isStreamingStarted: boolean
|
||||
}
|
||||
|
||||
async function handleToolSubmission(params: ToolSubmissionParams): Promise<ToolSubmissionResult> {
|
||||
const {
|
||||
openai,
|
||||
threadId,
|
||||
runThreadId,
|
||||
submitToolOutputs,
|
||||
tools,
|
||||
analyticHandlers,
|
||||
parentIds,
|
||||
llmIds,
|
||||
sseStreamer,
|
||||
chatId,
|
||||
options,
|
||||
input,
|
||||
usedTools
|
||||
} = params
|
||||
|
||||
let updatedText = params.text
|
||||
let updatedIsStreamingStarted = params.isStreamingStarted
|
||||
|
||||
const stream = openai.beta.threads.runs.submitToolOutputsStream(threadId, runThreadId, {
|
||||
tool_outputs: submitToolOutputs
|
||||
})
|
||||
|
||||
try {
|
||||
for await (const event of stream) {
|
||||
if (event.event === 'thread.message.delta') {
|
||||
const chunk = event.data.delta.content?.[0]
|
||||
if (chunk && 'text' in chunk && chunk.text?.value) {
|
||||
updatedText += chunk.text.value
|
||||
if (!updatedIsStreamingStarted) {
|
||||
updatedIsStreamingStarted = true
|
||||
if (sseStreamer) {
|
||||
sseStreamer.streamStartEvent(chatId, chunk.text.value)
|
||||
}
|
||||
}
|
||||
if (sseStreamer) {
|
||||
sseStreamer.streamTokenEvent(chatId, chunk.text.value)
|
||||
}
|
||||
}
|
||||
} else if (event.event === 'thread.run.requires_action') {
|
||||
if (event.data.required_action?.submit_tool_outputs.tool_calls) {
|
||||
const actions: ICommonObject[] = []
|
||||
|
||||
event.data.required_action.submit_tool_outputs.tool_calls.forEach((item) => {
|
||||
const functionCall = item.function
|
||||
let args = {}
|
||||
try {
|
||||
args = JSON.parse(functionCall.arguments)
|
||||
} catch (e) {
|
||||
console.error('Error parsing arguments, default to empty object')
|
||||
}
|
||||
actions.push({
|
||||
tool: functionCall.name,
|
||||
toolInput: args,
|
||||
toolCallId: item.id
|
||||
})
|
||||
})
|
||||
|
||||
const nestedToolOutputs = []
|
||||
for (let i = 0; i < actions.length; i += 1) {
|
||||
const tool = tools.find((tool: any) => tool.name === actions[i].tool)
|
||||
if (!tool) continue
|
||||
|
||||
const toolIds = await analyticHandlers.onToolStart(tool.name, actions[i].toolInput, parentIds)
|
||||
|
||||
try {
|
||||
const toolOutput = await tool.call(actions[i].toolInput, undefined, undefined, {
|
||||
sessionId: threadId,
|
||||
chatId: options.chatId,
|
||||
input
|
||||
})
|
||||
await analyticHandlers.onToolEnd(toolIds, toolOutput)
|
||||
nestedToolOutputs.push({
|
||||
tool_call_id: actions[i].toolCallId,
|
||||
output: toolOutput
|
||||
})
|
||||
usedTools.push({
|
||||
tool: tool.name,
|
||||
toolInput: actions[i].toolInput,
|
||||
toolOutput
|
||||
})
|
||||
} catch (e) {
|
||||
await analyticHandlers.onToolEnd(toolIds, e)
|
||||
console.error('Error executing tool', e)
|
||||
throw new Error(`Error executing tool. Tool: ${tool.name}. Thread ID: ${threadId}. Run ID: ${runThreadId}`)
|
||||
}
|
||||
}
|
||||
|
||||
// Recursively handle nested tool submissions
|
||||
const result = await handleToolSubmission({
|
||||
openai,
|
||||
threadId,
|
||||
runThreadId,
|
||||
submitToolOutputs: nestedToolOutputs,
|
||||
tools,
|
||||
analyticHandlers,
|
||||
parentIds,
|
||||
llmIds,
|
||||
sseStreamer,
|
||||
chatId,
|
||||
options,
|
||||
input,
|
||||
usedTools,
|
||||
text: updatedText,
|
||||
isStreamingStarted: updatedIsStreamingStarted
|
||||
})
|
||||
updatedText = result.text
|
||||
updatedIsStreamingStarted = result.isStreamingStarted
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (sseStreamer) {
|
||||
sseStreamer.streamUsedToolsEvent(chatId, usedTools)
|
||||
}
|
||||
|
||||
return {
|
||||
text: updatedText,
|
||||
isStreamingStarted: updatedIsStreamingStarted
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('Error submitting tool outputs:', error)
|
||||
await openai.beta.threads.runs.cancel(threadId, runThreadId)
|
||||
|
||||
const errMsg = `Error submitting tool outputs. Thread ID: ${threadId}. Run ID: ${runThreadId}`
|
||||
|
||||
await analyticHandlers.onLLMError(llmIds, errMsg)
|
||||
await analyticHandlers.onChainError(parentIds, errMsg, true)
|
||||
|
||||
throw new Error(errMsg)
|
||||
}
|
||||
}
|
||||
|
||||
interface JSONSchema {
|
||||
type?: string
|
||||
properties?: Record<string, JSONSchema>
|
||||
additionalProperties?: boolean
|
||||
required?: string[]
|
||||
[key: string]: any
|
||||
}
|
||||
|
||||
const formatToOpenAIAssistantTool = (tool: any): OpenAI.Beta.FunctionTool => {
|
||||
return {
|
||||
const parameters = zodToJsonSchema(tool.schema) as JSONSchema
|
||||
|
||||
// For strict tools, we need to:
|
||||
// 1. Set additionalProperties to false
|
||||
// 2. Make all parameters required
|
||||
// 3. Set the strict flag
|
||||
if (tool instanceof DynamicStructuredTool && tool.isStrict()) {
|
||||
// Get all property names from the schema
|
||||
const properties = parameters.properties || {}
|
||||
const allPropertyNames = Object.keys(properties)
|
||||
|
||||
parameters.additionalProperties = false
|
||||
parameters.required = allPropertyNames
|
||||
|
||||
// Handle nested objects
|
||||
for (const [_, prop] of Object.entries(properties)) {
|
||||
if (prop.type === 'object') {
|
||||
prop.additionalProperties = false
|
||||
if (prop.properties) {
|
||||
prop.required = Object.keys(prop.properties)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const functionTool: OpenAI.Beta.FunctionTool = {
|
||||
type: 'function',
|
||||
function: {
|
||||
name: tool.name,
|
||||
description: tool.description,
|
||||
parameters: zodToJsonSchema(tool.schema)
|
||||
parameters
|
||||
}
|
||||
}
|
||||
|
||||
// Add strict property if the tool is marked as strict
|
||||
if (tool instanceof DynamicStructuredTool && tool.isStrict()) {
|
||||
;(functionTool.function as any).strict = true
|
||||
}
|
||||
|
||||
return functionTool
|
||||
}
|
||||
|
||||
module.exports = { nodeClass: OpenAIAssistant_Agents }
|
||||
|
||||
Reference in New Issue
Block a user