mirror of
https://github.com/farcasclaudiu/Flowise.git
synced 2026-06-28 21:00:58 +03:00
add upload files and tool features
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
import { ICommonObject, IDatabaseEntity, INode, INodeData, INodeOptionsValue, INodeParams } from '../../../src/Interface'
|
||||
import { ICommonObject, IDatabaseEntity, INode, INodeData, INodeOptionsValue, INodeParams, IUsedTool } from '../../../src/Interface'
|
||||
import OpenAI from 'openai'
|
||||
import { DataSource } from 'typeorm'
|
||||
import { getCredentialData, getCredentialParam, getUserHome } from '../../../src/utils'
|
||||
@@ -6,6 +6,8 @@ import { MessageContentImageFile, MessageContentText } from 'openai/resources/be
|
||||
import * as fsDefault from 'node:fs'
|
||||
import * as path from 'node:path'
|
||||
import fetch from 'node-fetch'
|
||||
import { flatten } from 'lodash'
|
||||
import { zodToJsonSchema } from 'zod-to-json-schema'
|
||||
|
||||
class OpenAIAssistant_Agents implements INode {
|
||||
label: string
|
||||
@@ -33,6 +35,12 @@ class OpenAIAssistant_Agents implements INode {
|
||||
name: 'selectedAssistant',
|
||||
type: 'asyncOptions',
|
||||
loadMethod: 'listAssistants'
|
||||
},
|
||||
{
|
||||
label: 'Allowed Tools',
|
||||
name: 'tools',
|
||||
type: 'Tool',
|
||||
list: true
|
||||
}
|
||||
]
|
||||
}
|
||||
@@ -78,19 +86,28 @@ class OpenAIAssistant_Agents implements INode {
|
||||
id: selectedAssistantId
|
||||
})
|
||||
|
||||
if (!assistant) throw new Error(`Assistant ${selectedAssistantId} not found`)
|
||||
if (!assistant) {
|
||||
options.logger.error(`Assistant ${selectedAssistantId} not found`)
|
||||
return
|
||||
}
|
||||
|
||||
if (!sessionId && options.chatId) {
|
||||
const chatmsg = await appDataSource.getRepository(databaseEntities['ChatMessage']).findOneBy({
|
||||
chatId: options.chatId
|
||||
})
|
||||
if (!chatmsg) throw new Error(`Chat Message with Chat Id: ${options.chatId} not found`)
|
||||
if (!chatmsg) {
|
||||
options.logger.error(`Chat Message with Chat Id: ${options.chatId} not found`)
|
||||
return
|
||||
}
|
||||
sessionId = chatmsg.sessionId
|
||||
}
|
||||
|
||||
const credentialData = await getCredentialData(assistant.credential ?? '', options)
|
||||
const openAIApiKey = getCredentialParam('openAIApiKey', credentialData, nodeData)
|
||||
if (!openAIApiKey) throw new Error(`OpenAI ApiKey not found`)
|
||||
if (!openAIApiKey) {
|
||||
options.logger.error(`OpenAI ApiKey not found`)
|
||||
return
|
||||
}
|
||||
|
||||
const openai = new OpenAI({ apiKey: openAIApiKey })
|
||||
options.logger.info(`Clearing OpenAI Thread ${sessionId}`)
|
||||
@@ -102,6 +119,9 @@ class OpenAIAssistant_Agents implements INode {
|
||||
const selectedAssistantId = nodeData.inputs?.selectedAssistant as string
|
||||
const appDataSource = options.appDataSource as DataSource
|
||||
const databaseEntities = options.databaseEntities as IDatabaseEntity
|
||||
let tools = nodeData.inputs?.tools
|
||||
tools = flatten(tools)
|
||||
const formattedTools = tools?.map((tool: any) => formatToOpenAIAssistantTool(tool)) ?? []
|
||||
|
||||
const assistant = await appDataSource.getRepository(databaseEntities['Assistant']).findOneBy({
|
||||
id: selectedAssistantId
|
||||
@@ -116,83 +136,143 @@ class OpenAIAssistant_Agents implements INode {
|
||||
const openai = new OpenAI({ apiKey: openAIApiKey })
|
||||
|
||||
// Retrieve assistant
|
||||
const assistantDetails = JSON.parse(assistant.details)
|
||||
const openAIAssistantId = assistantDetails.id
|
||||
const retrievedAssistant = await openai.beta.assistants.retrieve(openAIAssistantId)
|
||||
try {
|
||||
const assistantDetails = JSON.parse(assistant.details)
|
||||
const openAIAssistantId = assistantDetails.id
|
||||
const retrievedAssistant = await openai.beta.assistants.retrieve(openAIAssistantId)
|
||||
|
||||
const chatmessage = await appDataSource.getRepository(databaseEntities['ChatMessage']).findOneBy({
|
||||
chatId: options.chatId
|
||||
})
|
||||
|
||||
let threadId = ''
|
||||
if (!chatmessage) {
|
||||
const thread = await openai.beta.threads.create({})
|
||||
threadId = thread.id
|
||||
} else {
|
||||
const thread = await openai.beta.threads.retrieve(chatmessage.sessionId)
|
||||
threadId = thread.id
|
||||
}
|
||||
|
||||
// Add message to thread
|
||||
await openai.beta.threads.messages.create(threadId, {
|
||||
role: 'user',
|
||||
content: input
|
||||
})
|
||||
|
||||
// Run assistant thread
|
||||
const runThread = await openai.beta.threads.runs.create(threadId, {
|
||||
assistant_id: retrievedAssistant.id
|
||||
})
|
||||
|
||||
const promise = (threadId: string, runId: string) => {
|
||||
return new Promise((resolve, reject) => {
|
||||
const timeout = setInterval(async () => {
|
||||
const run = await openai.beta.threads.runs.retrieve(threadId, runId)
|
||||
const state = run.status
|
||||
if (state === 'completed') {
|
||||
clearInterval(timeout)
|
||||
resolve(run)
|
||||
} else if (state === 'cancelled' || state === 'expired' || state === 'failed') {
|
||||
clearInterval(timeout)
|
||||
reject(new Error(`Error processing thread: ${state}, Thread ID: ${threadId}, Run ID: ${runId}`))
|
||||
}
|
||||
}, 500)
|
||||
})
|
||||
}
|
||||
|
||||
// Polling run status
|
||||
await promise(threadId, runThread.id)
|
||||
|
||||
// List messages
|
||||
const messages = await openai.beta.threads.messages.list(threadId)
|
||||
const messageData = messages.data ?? []
|
||||
const assistantMessages = messageData.filter((msg) => msg.role === 'assistant')
|
||||
if (!assistantMessages.length) return ''
|
||||
|
||||
let returnVal = ''
|
||||
for (let i = 0; i < assistantMessages[0].content.length; i += 1) {
|
||||
if (assistantMessages[0].content[i].type === 'text') {
|
||||
const content = assistantMessages[0].content[i] as MessageContentText
|
||||
returnVal += content.text.value
|
||||
|
||||
//TODO: handle annotations
|
||||
} else {
|
||||
const content = assistantMessages[0].content[i] as MessageContentImageFile
|
||||
const fileId = content.image_file.file_id
|
||||
const fileObj = await openai.files.retrieve(fileId)
|
||||
const filePath = path.join(getUserHome(), '.flowise', 'openai-assistant', `${fileObj.filename}.png`)
|
||||
|
||||
await downloadFile(fileObj, filePath, openAIApiKey)
|
||||
|
||||
const bitmap = fsDefault.readFileSync(filePath)
|
||||
const base64String = Buffer.from(bitmap).toString('base64')
|
||||
|
||||
const imgHTML = `<img src="data:image/png;base64,${base64String}" width="100%" height="max-content" alt="${fileObj.filename}" /><br/>`
|
||||
returnVal += imgHTML
|
||||
if (formattedTools.length) {
|
||||
await openai.beta.assistants.update(openAIAssistantId, { tools: formattedTools })
|
||||
}
|
||||
}
|
||||
|
||||
return { text: returnVal, assistant: { assistantId: openAIAssistantId, threadId, runId: runThread.id, messages: messageData } }
|
||||
const chatmessage = await appDataSource.getRepository(databaseEntities['ChatMessage']).findOneBy({
|
||||
chatId: options.chatId
|
||||
})
|
||||
|
||||
let threadId = ''
|
||||
if (!chatmessage) {
|
||||
const thread = await openai.beta.threads.create({})
|
||||
threadId = thread.id
|
||||
} else {
|
||||
const thread = await openai.beta.threads.retrieve(chatmessage.sessionId)
|
||||
threadId = thread.id
|
||||
}
|
||||
|
||||
// Add message to thread
|
||||
await openai.beta.threads.messages.create(threadId, {
|
||||
role: 'user',
|
||||
content: input
|
||||
})
|
||||
|
||||
// Run assistant thread
|
||||
const runThread = await openai.beta.threads.runs.create(threadId, {
|
||||
assistant_id: retrievedAssistant.id
|
||||
})
|
||||
|
||||
const usedTools: IUsedTool[] = []
|
||||
|
||||
const promise = (threadId: string, runId: string) => {
|
||||
return new Promise((resolve, reject) => {
|
||||
const timeout = setInterval(async () => {
|
||||
const run = await openai.beta.threads.runs.retrieve(threadId, runId)
|
||||
const state = run.status
|
||||
if (state === 'completed') {
|
||||
clearInterval(timeout)
|
||||
resolve(state)
|
||||
} else if (state === 'requires_action') {
|
||||
if (run.required_action?.submit_tool_outputs.tool_calls) {
|
||||
clearInterval(timeout)
|
||||
const actions: ICommonObject[] = []
|
||||
run.required_action.submit_tool_outputs.tool_calls.forEach((item) => {
|
||||
const functionCall = item.function
|
||||
const args = JSON.parse(functionCall.arguments)
|
||||
actions.push({
|
||||
tool: functionCall.name,
|
||||
toolInput: args,
|
||||
toolCallId: item.id
|
||||
})
|
||||
})
|
||||
|
||||
const submitToolOutputs = []
|
||||
for (let i = 0; i < actions.length; i += 1) {
|
||||
const tool = tools.find((tool: any) => tool.name === actions[i].tool)
|
||||
if (!tool) continue
|
||||
const toolOutput = await tool.call(actions[i].toolInput)
|
||||
submitToolOutputs.push({
|
||||
tool_call_id: actions[i].toolCallId,
|
||||
output: toolOutput
|
||||
})
|
||||
usedTools.push({
|
||||
tool: tool.name,
|
||||
toolInput: actions[i].toolInput,
|
||||
toolOutput
|
||||
})
|
||||
}
|
||||
|
||||
if (submitToolOutputs.length) {
|
||||
await openai.beta.threads.runs.submitToolOutputs(threadId, runId, {
|
||||
tool_outputs: submitToolOutputs
|
||||
})
|
||||
resolve(state)
|
||||
} else {
|
||||
reject(
|
||||
new Error(
|
||||
`Error processing thread: ${state}, Thread ID: ${threadId}, Run ID: ${runId}. submit_tool_outputs.tool_calls are empty`
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
} else if (state === 'cancelled' || state === 'expired' || state === 'failed') {
|
||||
clearInterval(timeout)
|
||||
reject(new Error(`Error processing thread: ${state}, Thread ID: ${threadId}, Run ID: ${runId}`))
|
||||
}
|
||||
}, 500)
|
||||
})
|
||||
}
|
||||
|
||||
// Polling run status
|
||||
let state = await promise(threadId, runThread.id)
|
||||
while (state === 'requires_action') {
|
||||
state = await promise(threadId, runThread.id)
|
||||
}
|
||||
|
||||
// List messages
|
||||
const messages = await openai.beta.threads.messages.list(threadId)
|
||||
const messageData = messages.data ?? []
|
||||
const assistantMessages = messageData.filter((msg) => msg.role === 'assistant')
|
||||
if (!assistantMessages.length) return ''
|
||||
|
||||
let returnVal = ''
|
||||
for (let i = 0; i < assistantMessages[0].content.length; i += 1) {
|
||||
if (assistantMessages[0].content[i].type === 'text') {
|
||||
const content = assistantMessages[0].content[i] as MessageContentText
|
||||
returnVal += content.text.value
|
||||
|
||||
//TODO: handle annotations
|
||||
} else {
|
||||
const content = assistantMessages[0].content[i] as MessageContentImageFile
|
||||
const fileId = content.image_file.file_id
|
||||
const fileObj = await openai.files.retrieve(fileId)
|
||||
const filePath = path.join(getUserHome(), '.flowise', 'openai-assistant', `${fileObj.filename}.png`)
|
||||
|
||||
await downloadFile(fileObj, filePath, openAIApiKey)
|
||||
|
||||
const bitmap = fsDefault.readFileSync(filePath)
|
||||
const base64String = Buffer.from(bitmap).toString('base64')
|
||||
|
||||
const imgHTML = `<img src="data:image/png;base64,${base64String}" width="100%" height="max-content" alt="${fileObj.filename}" /><br/>`
|
||||
returnVal += imgHTML
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
text: returnVal,
|
||||
usedTools,
|
||||
assistant: { assistantId: openAIAssistantId, threadId, runId: runThread.id, messages: messageData }
|
||||
}
|
||||
} catch (error) {
|
||||
throw new Error(error)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -221,4 +301,15 @@ const downloadFile = async (fileObj: any, filePath: string, openAIApiKey: string
|
||||
}
|
||||
}
|
||||
|
||||
const formatToOpenAIAssistantTool = (tool: any): OpenAI.Beta.AssistantCreateParams.AssistantToolsFunction => {
|
||||
return {
|
||||
type: 'function',
|
||||
function: {
|
||||
name: tool.name,
|
||||
description: tool.description,
|
||||
parameters: zodToJsonSchema(tool.schema)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = { nodeClass: OpenAIAssistant_Agents }
|
||||
|
||||
Reference in New Issue
Block a user