Feature/OpenAI Assistant V2 (#2258)

* add gpt4 turbo to assistant

* OpenAI Assistant V2

* update langfuse handler
This commit is contained in:
Henry Heng
2024-04-25 20:14:04 +01:00
committed by GitHub
parent 4782c0f6fc
commit 7360d1d9a6
25 changed files with 23422 additions and 17637 deletions
@@ -1,16 +1,17 @@
import { ICommonObject, IDatabaseEntity, INode, INodeData, INodeOptionsValue, INodeParams, IUsedTool } from '../../../src/Interface'
import OpenAI from 'openai'
import { DataSource } from 'typeorm'
import { getCredentialData, getCredentialParam, getUserHome } from '../../../src/utils'
import { ImageFileContentBlock, TextContentBlock } from 'openai/resources/beta/threads/messages/messages'
import * as fsDefault from 'node:fs'
import * as path from 'node:path'
import { getCredentialData, getCredentialParam } from '../../../src/utils'
import fetch from 'node-fetch'
import { flatten, uniqWith, isEqual } from 'lodash'
import { zodToJsonSchema } from 'zod-to-json-schema'
import { AnalyticHandler } from '../../../src/handler'
import { Moderation, checkInputs, streamResponse } from '../../moderation/Moderation'
import { formatResponse } from '../../outputparsers/OutputParserHelpers'
import { addFileToStorage } from '../../../src/storageUtils'
const lenticularBracketRegex = /【[^】]*】/g
const imageRegex = /<img[^>]*\/>/g
class OpenAIAssistant_Agents implements INode {
label: string
@@ -168,6 +169,9 @@ class OpenAIAssistant_Agents implements INode {
tools = flatten(tools)
const formattedTools = tools?.map((tool: any) => formatToOpenAIAssistantTool(tool)) ?? []
const usedTools: IUsedTool[] = []
const fileAnnotations = []
const assistant = await appDataSource.getRepository(databaseEntities['Assistant']).findOneBy({
id: selectedAssistantId
})
@@ -195,7 +199,7 @@ class OpenAIAssistant_Agents implements INode {
if (formattedTools.length) {
let filteredTools = []
for (const tool of retrievedAssistant.tools) {
if (tool.type === 'code_interpreter' || tool.type === 'retrieval') filteredTools.push(tool)
if (tool.type === 'code_interpreter' || tool.type === 'file_search') filteredTools.push(tool)
}
filteredTools = uniqWith([...filteredTools, ...formattedTools], isEqual)
// filter out tool with empty function
@@ -236,7 +240,8 @@ class OpenAIAssistant_Agents implements INode {
(runStatus === 'cancelled' ||
runStatus === 'completed' ||
runStatus === 'expired' ||
runStatus === 'failed')
runStatus === 'failed' ||
runStatus === 'requires_action')
) {
clearInterval(timeout)
resolve()
@@ -259,11 +264,235 @@ class OpenAIAssistant_Agents implements INode {
// Run assistant thread
const llmIds = await analyticHandlers.onLLMStart('ChatOpenAI', input, parentIds)
const runThread = await openai.beta.threads.runs.create(threadId, {
assistant_id: retrievedAssistant.id
})
const usedTools: IUsedTool[] = []
let text = ''
let runThreadId = ''
let isStreamingStarted = false
if (isStreaming) {
const streamThread = await openai.beta.threads.runs.create(threadId, {
assistant_id: retrievedAssistant.id,
stream: true
})
for await (const event of streamThread) {
if (event.event === 'thread.run.created') {
runThreadId = event.data.id
}
if (event.event === 'thread.message.delta') {
const chunk = event.data.delta.content?.[0]
if (chunk && 'text' in chunk) {
if (chunk.text?.annotations?.length) {
const message_content = chunk.text
const annotations = chunk.text?.annotations
// Iterate over the annotations
for (let index = 0; index < annotations.length; index++) {
const annotation = annotations[index]
let filePath = ''
// Gather citations based on annotation attributes
const file_citation = (annotation as OpenAI.Beta.Threads.Messages.FileCitationAnnotation).file_citation
if (file_citation) {
const cited_file = await openai.files.retrieve(file_citation.file_id)
// eslint-disable-next-line no-useless-escape
const fileName = cited_file.filename.split(/[\/\\]/).pop() ?? cited_file.filename
if (!disableFileDownload) {
filePath = await downloadFile(openAIApiKey, cited_file, fileName, options.chatflowid, threadId)
fileAnnotations.push({
filePath,
fileName
})
}
} else {
const file_path = (annotation as OpenAI.Beta.Threads.Messages.FilePathAnnotation).file_path
if (file_path) {
const cited_file = await openai.files.retrieve(file_path.file_id)
// eslint-disable-next-line no-useless-escape
const fileName = cited_file.filename.split(/[\/\\]/).pop() ?? cited_file.filename
if (!disableFileDownload) {
filePath = await downloadFile(
openAIApiKey,
cited_file,
fileName,
options.chatflowid,
threadId
)
fileAnnotations.push({
filePath,
fileName
})
}
}
}
// Replace the text with a footnote
message_content.value = message_content.value?.replace(
`${annotation.text}`,
`${disableFileDownload ? '' : filePath}`
)
}
// Remove lenticular brackets
message_content.value = message_content.value?.replace(lenticularBracketRegex, '')
text += message_content.value ?? ''
if (message_content.value) {
if (!isStreamingStarted) {
isStreamingStarted = true
socketIO.to(socketIOClientId).emit('start', message_content.value)
}
socketIO.to(socketIOClientId).emit('token', message_content.value)
}
if (fileAnnotations.length) {
if (!isStreamingStarted) {
isStreamingStarted = true
socketIO.to(socketIOClientId).emit('start', '')
}
socketIO.to(socketIOClientId).emit('fileAnnotations', fileAnnotations)
}
} else {
text += chunk.text?.value
if (!isStreamingStarted) {
isStreamingStarted = true
socketIO.to(socketIOClientId).emit('start', chunk.text?.value)
}
socketIO.to(socketIOClientId).emit('token', chunk.text?.value)
}
}
if (chunk && 'image_file' in chunk && chunk.image_file?.file_id) {
const fileId = chunk.image_file.file_id
const fileObj = await openai.files.retrieve(fileId)
const buffer = await downloadImg(openai, fileId, `${fileObj.filename}.png`, options.chatflowid, threadId)
const base64String = Buffer.from(buffer).toString('base64')
// TODO: Use a file path and retrieve image on the fly. Storing as base64 to localStorage and database will easily hit limits
const imgHTML = `<img src="data:image/png;base64,${base64String}" width="100%" height="max-content" alt="${fileObj.filename}" /><br/>`
text += imgHTML
if (!isStreamingStarted) {
isStreamingStarted = true
socketIO.to(socketIOClientId).emit('start', imgHTML)
}
socketIO.to(socketIOClientId).emit('token', imgHTML)
}
}
if (event.event === 'thread.run.requires_action') {
if (event.data.required_action?.submit_tool_outputs.tool_calls) {
const actions: ICommonObject[] = []
event.data.required_action.submit_tool_outputs.tool_calls.forEach((item) => {
const functionCall = item.function
let args = {}
try {
args = JSON.parse(functionCall.arguments)
} catch (e) {
console.error('Error parsing arguments, default to empty object')
}
actions.push({
tool: functionCall.name,
toolInput: args,
toolCallId: item.id
})
})
const submitToolOutputs = []
for (let i = 0; i < actions.length; i += 1) {
const tool = tools.find((tool: any) => tool.name === actions[i].tool)
if (!tool) continue
// Start tool analytics
const toolIds = await analyticHandlers.onToolStart(tool.name, actions[i].toolInput, parentIds)
try {
const toolOutput = await tool.call(actions[i].toolInput, undefined, undefined, {
sessionId: threadId,
chatId: options.chatId,
input
})
await analyticHandlers.onToolEnd(toolIds, toolOutput)
submitToolOutputs.push({
tool_call_id: actions[i].toolCallId,
output: toolOutput
})
usedTools.push({
tool: tool.name,
toolInput: actions[i].toolInput,
toolOutput
})
} catch (e) {
await analyticHandlers.onToolEnd(toolIds, e)
console.error('Error executing tool', e)
throw new Error(
`Error executing tool. Tool: ${tool.name}. Thread ID: ${threadId}. Run ID: ${runThreadId}`
)
}
}
try {
const stream = openai.beta.threads.runs.submitToolOutputsStream(threadId, runThreadId, {
tool_outputs: submitToolOutputs
})
for await (const event of stream) {
if (event.event === 'thread.message.delta') {
const chunk = event.data.delta.content?.[0]
if (chunk && 'text' in chunk && chunk.text?.value) {
text += chunk.text.value
if (!isStreamingStarted) {
isStreamingStarted = true
socketIO.to(socketIOClientId).emit('start', chunk.text.value)
}
socketIO.to(socketIOClientId).emit('token', chunk.text.value)
}
}
}
socketIO.to(socketIOClientId).emit('usedTools', usedTools)
} catch (error) {
console.error('Error submitting tool outputs:', error)
await openai.beta.threads.runs.cancel(threadId, runThreadId)
const errMsg = `Error submitting tool outputs. Thread ID: ${threadId}. Run ID: ${runThreadId}`
await analyticHandlers.onLLMError(llmIds, errMsg)
await analyticHandlers.onChainError(parentIds, errMsg, true)
throw new Error(errMsg)
}
}
}
}
// List messages
const messages = await openai.beta.threads.messages.list(threadId)
const messageData = messages.data ?? []
const assistantMessages = messageData.filter((msg) => msg.role === 'assistant')
if (!assistantMessages.length) return ''
// Remove images from the logging text
let llmOutput = text.replace(imageRegex, '')
llmOutput = llmOutput.replace('<br/>', '')
await analyticHandlers.onLLMEnd(llmIds, llmOutput)
await analyticHandlers.onChainEnd(parentIds, messageData, true)
return {
text,
usedTools,
fileAnnotations,
assistant: { assistantId: openAIAssistantId, threadId, runId: runThreadId, messages: messageData }
}
}
const promise = (threadId: string, runId: string) => {
return new Promise((resolve, reject) => {
@@ -299,8 +528,7 @@ class OpenAIAssistant_Agents implements INode {
// Start tool analytics
const toolIds = await analyticHandlers.onToolStart(tool.name, actions[i].toolInput, parentIds)
if (options.socketIO && options.socketIOClientId)
options.socketIO.to(options.socketIOClientId).emit('tool', tool.name)
if (socketIO && socketIOClientId) socketIO.to(socketIOClientId).emit('tool', tool.name)
try {
const toolOutput = await tool.call(actions[i].toolInput, undefined, undefined, {
@@ -360,7 +588,10 @@ class OpenAIAssistant_Agents implements INode {
}
// Polling run status
let runThreadId = runThread.id
const runThread = await openai.beta.threads.runs.create(threadId, {
assistant_id: retrievedAssistant.id
})
runThreadId = runThread.id
let state = await promise(threadId, runThread.id)
while (state === 'requires_action') {
state = await promise(threadId, runThread.id)
@@ -389,17 +620,14 @@ class OpenAIAssistant_Agents implements INode {
if (!assistantMessages.length) return ''
let returnVal = ''
const fileAnnotations = []
for (let i = 0; i < assistantMessages[0].content.length; i += 1) {
if (assistantMessages[0].content[i].type === 'text') {
const content = assistantMessages[0].content[i] as TextContentBlock
const content = assistantMessages[0].content[i] as OpenAI.Beta.Threads.Messages.TextContentBlock
if (content.text.annotations) {
const message_content = content.text
const annotations = message_content.annotations
const dirPath = path.join(getUserHome(), '.flowise', 'openai-assistant')
// Iterate over the annotations
for (let index = 0; index < annotations.length; index++) {
const annotation = annotations[index]
@@ -407,13 +635,13 @@ class OpenAIAssistant_Agents implements INode {
// Gather citations based on annotation attributes
const file_citation = (annotation as OpenAI.Beta.Threads.Messages.FileCitationAnnotation).file_citation
if (file_citation) {
const cited_file = await openai.files.retrieve(file_citation.file_id)
// eslint-disable-next-line no-useless-escape
const fileName = cited_file.filename.split(/[\/\\]/).pop() ?? cited_file.filename
filePath = path.join(getUserHome(), '.flowise', 'openai-assistant', fileName)
if (!disableFileDownload) {
await downloadFile(cited_file, filePath, dirPath, openAIApiKey)
filePath = await downloadFile(openAIApiKey, cited_file, fileName, options.chatflowid, threadId)
fileAnnotations.push({
filePath,
fileName
@@ -425,9 +653,8 @@ class OpenAIAssistant_Agents implements INode {
const cited_file = await openai.files.retrieve(file_path.file_id)
// eslint-disable-next-line no-useless-escape
const fileName = cited_file.filename.split(/[\/\\]/).pop() ?? cited_file.filename
filePath = path.join(getUserHome(), '.flowise', 'openai-assistant', fileName)
if (!disableFileDownload) {
await downloadFile(cited_file, filePath, dirPath, openAIApiKey)
filePath = await downloadFile(openAIApiKey, cited_file, fileName, options.chatflowid, threadId)
fileAnnotations.push({
filePath,
fileName
@@ -448,19 +675,14 @@ class OpenAIAssistant_Agents implements INode {
returnVal += content.text.value
}
const lenticularBracketRegex = /【[^】]*】/g
returnVal = returnVal.replace(lenticularBracketRegex, '')
} else {
const content = assistantMessages[0].content[i] as ImageFileContentBlock
const content = assistantMessages[0].content[i] as OpenAI.Beta.Threads.Messages.ImageFileContentBlock
const fileId = content.image_file.file_id
const fileObj = await openai.files.retrieve(fileId)
const dirPath = path.join(getUserHome(), '.flowise', 'openai-assistant')
const filePath = path.join(getUserHome(), '.flowise', 'openai-assistant', `${fileObj.filename}.png`)
await downloadImg(openai, fileId, filePath, dirPath)
const bitmap = fsDefault.readFileSync(filePath)
const base64String = Buffer.from(bitmap).toString('base64')
const buffer = await downloadImg(openai, fileId, `${fileObj.filename}.png`, options.chatflowid, threadId)
const base64String = Buffer.from(buffer).toString('base64')
// TODO: Use a file path and retrieve image on the fly. Storing as base64 to localStorage and database will easily hit limits
const imgHTML = `<img src="data:image/png;base64,${base64String}" width="100%" height="max-content" alt="${fileObj.filename}" /><br/>`
@@ -468,7 +690,6 @@ class OpenAIAssistant_Agents implements INode {
}
}
const imageRegex = /<img[^>]*\/>/g
let llmOutput = returnVal.replace(imageRegex, '')
llmOutput = llmOutput.replace('<br/>', '')
@@ -488,7 +709,7 @@ class OpenAIAssistant_Agents implements INode {
}
}
const downloadImg = async (openai: OpenAI, fileId: string, filePath: string, dirPath: string) => {
const downloadImg = async (openai: OpenAI, fileId: string, fileName: string, ...paths: string[]) => {
const response = await openai.files.content(fileId)
// Extract the binary data from the Response object
@@ -496,15 +717,14 @@ const downloadImg = async (openai: OpenAI, fileId: string, filePath: string, dir
// Convert the binary data to a Buffer
const image_data_buffer = Buffer.from(image_data)
const mime = 'image/png'
// Save the image to a specific location
if (!fsDefault.existsSync(dirPath)) {
fsDefault.mkdirSync(path.dirname(filePath), { recursive: true })
}
fsDefault.writeFileSync(filePath, image_data_buffer)
await addFileToStorage(mime, image_data_buffer, fileName, ...paths)
return image_data_buffer
}
const downloadFile = async (fileObj: any, filePath: string, dirPath: string, openAIApiKey: string) => {
const downloadFile = async (openAIApiKey: string, fileObj: any, fileName: string, ...paths: string[]) => {
try {
const response = await fetch(`https://api.openai.com/v1/files/${fileObj.id}/content`, {
method: 'GET',
@@ -515,20 +735,17 @@ const downloadFile = async (fileObj: any, filePath: string, dirPath: string, ope
throw new Error(`HTTP error! status: ${response.status}`)
}
await new Promise<void>((resolve, reject) => {
if (!fsDefault.existsSync(dirPath)) {
fsDefault.mkdirSync(path.dirname(filePath), { recursive: true })
}
const dest = fsDefault.createWriteStream(filePath)
response.body.pipe(dest)
response.body.on('end', () => resolve())
dest.on('error', reject)
})
// Extract the binary data from the Response object
const data = await response.arrayBuffer()
// eslint-disable-next-line no-console
console.log('File downloaded and written to', filePath)
// Convert the binary data to a Buffer
const data_buffer = Buffer.from(data)
const mime = 'application/octet-stream'
return await addFileToStorage(mime, data_buffer, fileName, ...paths)
} catch (error) {
console.error('Error downloading or writing the file:', error)
return ''
}
}