Feature/Full File Uploads & Message Delete API (#3314)

* add functionality for full file uploads, add remove messages from view dialog and API

* add attachments swagger

* update question to include uploadedFilesContent

* make config dialog modal lg size
This commit is contained in:
Henry Heng
2024-10-23 11:00:46 +01:00
committed by GitHub
parent 116d02d0bc
commit 53e504c32f
31 changed files with 1012 additions and 193 deletions
+44 -12
View File
@@ -63,7 +63,8 @@ export const buildAgentGraph = async (
isInternal: boolean,
baseURL?: string,
sseStreamer?: IServerSideEventStreamer,
shouldStreamResponse?: boolean
shouldStreamResponse?: boolean,
uploadedFilesContent?: string
): Promise<any> => {
try {
const appServer = getRunningExpressApp()
@@ -129,7 +130,8 @@ export const buildAgentGraph = async (
cachePool: appServer.cachePool,
isUpsert: false,
uploads: incomingInput.uploads,
baseURL
baseURL,
uploadedFilesContent
})
const options = {
@@ -188,7 +190,8 @@ export const buildAgentGraph = async (
chatHistory,
incomingInput?.overrideConfig,
sessionId || chatId,
seqAgentNodes.some((node) => node.data.inputs?.summarization)
seqAgentNodes.some((node) => node.data.inputs?.summarization),
uploadedFilesContent
)
} else {
isSequential = true
@@ -204,7 +207,8 @@ export const buildAgentGraph = async (
chatHistory,
incomingInput?.overrideConfig,
sessionId || chatId,
incomingInput.action
incomingInput.action,
uploadedFilesContent
)
}
@@ -348,7 +352,6 @@ export const buildAgentGraph = async (
if (isSequential && !finalResult && agentReasoning.length) {
const lastMessages = agentReasoning[agentReasoning.length - 1].messages
const lastAgentReasoningMessage = lastMessages[lastMessages.length - 1]
// If last message is an AI Message with tool calls, that means the last node was interrupted
if (lastMessageRaw.tool_calls && lastMessageRaw.tool_calls.length > 0) {
// The last node that got interrupted
@@ -456,6 +459,7 @@ export const buildAgentGraph = async (
* @param {ICommonObject} overrideConfig
* @param {string} threadId
* @param {boolean} summarization
* @param {string} uploadedFilesContent,
*/
const compileMultiAgentsGraph = async (
chatflow: IChatFlow,
@@ -470,7 +474,8 @@ const compileMultiAgentsGraph = async (
chatHistory: IMessage[] = [],
overrideConfig?: ICommonObject,
threadId?: string,
summarization?: boolean
summarization?: boolean,
uploadedFilesContent?: string
) => {
const appServer = getRunningExpressApp()
const channels: ITeamState = {
@@ -502,7 +507,15 @@ const compileMultiAgentsGraph = async (
let flowNodeData = cloneDeep(workerNode.data)
if (overrideConfig) flowNodeData = replaceInputsWithConfig(flowNodeData, overrideConfig)
flowNodeData = await resolveVariables(appServer.AppDataSource, flowNodeData, reactflowNodes, question, chatHistory, overrideConfig)
flowNodeData = await resolveVariables(
appServer.AppDataSource,
flowNodeData,
reactflowNodes,
question,
chatHistory,
overrideConfig,
uploadedFilesContent
)
try {
const workerResult: IMultiAgentNode = await newNodeInstance.init(flowNodeData, question, options)
@@ -533,7 +546,15 @@ const compileMultiAgentsGraph = async (
let flowNodeData = cloneDeep(supervisorNode.data)
if (overrideConfig) flowNodeData = replaceInputsWithConfig(flowNodeData, overrideConfig)
flowNodeData = await resolveVariables(appServer.AppDataSource, flowNodeData, reactflowNodes, question, chatHistory, overrideConfig)
flowNodeData = await resolveVariables(
appServer.AppDataSource,
flowNodeData,
reactflowNodes,
question,
chatHistory,
overrideConfig,
uploadedFilesContent
)
if (flowNodeData.inputs) flowNodeData.inputs.workerNodes = supervisorWorkers[supervisor]
@@ -603,9 +624,10 @@ const compileMultiAgentsGraph = async (
}
// Return stream result as we should only have 1 supervisor
const finalQuestion = uploadedFilesContent ? `${uploadedFilesContent}\n\n${question}` : question
return await graph.stream(
{
messages: [...prependMessages, new HumanMessage({ content: question })]
messages: [...prependMessages, new HumanMessage({ content: finalQuestion })]
},
{ recursionLimit: supervisorResult?.recursionLimit ?? 100, callbacks: [loggerHandler, ...callbacks], configurable: config }
)
@@ -641,7 +663,8 @@ const compileSeqAgentsGraph = async (
chatHistory: IMessage[] = [],
overrideConfig?: ICommonObject,
threadId?: string,
action?: IAction
action?: IAction,
uploadedFilesContent?: string
) => {
const appServer = getRunningExpressApp()
@@ -693,7 +716,15 @@ const compileSeqAgentsGraph = async (
flowNodeData = cloneDeep(node.data)
if (overrideConfig) flowNodeData = replaceInputsWithConfig(flowNodeData, overrideConfig)
flowNodeData = await resolveVariables(appServer.AppDataSource, flowNodeData, reactflowNodes, question, chatHistory, overrideConfig)
flowNodeData = await resolveVariables(
appServer.AppDataSource,
flowNodeData,
reactflowNodes,
question,
chatHistory,
overrideConfig,
uploadedFilesContent
)
const seqAgentNode: ISeqAgentNode = await newNodeInstance.init(flowNodeData, question, options)
return seqAgentNode
@@ -997,8 +1028,9 @@ const compileSeqAgentsGraph = async (
}
}
const finalQuestion = uploadedFilesContent ? `${uploadedFilesContent}\n\n${question}` : question
let humanMsg: { messages: HumanMessage[] | ToolMessage[] } | null = {
messages: [...prependMessages, new HumanMessage({ content: question })]
messages: [...prependMessages, new HumanMessage({ content: finalQuestion })]
}
if (action && action.mapping && question === action.mapping.approve) {
+28 -13
View File
@@ -19,7 +19,7 @@ import {
IReactFlowObject,
IReactFlowNode,
IDepthQueue,
chatType,
ChatType,
IChatMessage,
IChatFlow,
IReactFlowEdge
@@ -88,12 +88,14 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals
}
let fileUploads: IFileUpload[] = []
let uploadedFilesContent = ''
if (incomingInput.uploads) {
fileUploads = incomingInput.uploads
for (let i = 0; i < fileUploads.length; i += 1) {
const upload = fileUploads[i]
if ((upload.type === 'file' || upload.type === 'audio') && upload.data) {
// if upload in an image, a rag file, or audio
if ((upload.type === 'file' || upload.type === 'file:rag' || upload.type === 'audio') && upload.data) {
const filename = upload.name
const splitDataURI = upload.data.split(',')
const bf = Buffer.from(splitDataURI.pop() || '', 'base64')
@@ -139,6 +141,13 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals
}
}
}
if (upload.type === 'file:full' && upload.data) {
upload.type = 'stored-file:full'
// Omit upload.data since we don't store the content in database
uploadedFilesContent += `<doc name='${upload.name}'>${upload.data}</doc>\n\n`
fileUploads[i] = omit(upload, ['data'])
}
}
}
@@ -229,7 +238,8 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals
edges,
baseURL,
appServer.sseStreamer,
true
true,
uploadedFilesContent
)
}
@@ -345,6 +355,7 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals
apiMessageId,
componentNodes: appServer.nodesPool.componentNodes,
question: incomingInput.question,
uploadedFilesContent,
chatHistory,
chatId,
sessionId: sessionId ?? '',
@@ -384,7 +395,8 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals
reactFlowNodes,
incomingInput.question,
chatHistory,
flowData
flowData,
uploadedFilesContent
)
nodeToExecuteData = reactFlowNodeData
@@ -398,6 +410,7 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals
const nodeInstance = new nodeModule.nodeClass({ sessionId })
isStreamValid = (req.body.streaming === 'true' || req.body.streaming === true) && isStreamValid
const finalQuestion = uploadedFilesContent ? `${uploadedFilesContent}\n\n${incomingInput.question}` : incomingInput.question
const runParams = {
chatId,
@@ -411,7 +424,7 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals
prependMessages
}
let result = await nodeInstance.run(nodeToExecuteData, incomingInput.question, {
let result = await nodeInstance.run(nodeToExecuteData, finalQuestion, {
...runParams,
...(isStreamValid && { sseStreamer: appServer.sseStreamer, shouldStreamResponse: true })
})
@@ -427,7 +440,7 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals
role: 'userMessage',
content: incomingInput.question,
chatflowid,
chatType: isInternal ? chatType.INTERNAL : chatType.EXTERNAL,
chatType: isInternal ? ChatType.INTERNAL : ChatType.EXTERNAL,
chatId,
memoryType,
sessionId,
@@ -447,7 +460,7 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals
role: 'apiMessage',
content: resultText,
chatflowid,
chatType: isInternal ? chatType.INTERNAL : chatType.EXTERNAL,
chatType: isInternal ? ChatType.INTERNAL : ChatType.EXTERNAL,
chatId,
memoryType,
sessionId
@@ -476,7 +489,7 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals
version: await getAppVersion(),
chatflowId: chatflowid,
chatId,
type: isInternal ? chatType.INTERNAL : chatType.EXTERNAL,
type: isInternal ? ChatType.INTERNAL : ChatType.EXTERNAL,
flowGraph: getTelemetryFlowObj(nodes, edges)
})
@@ -517,7 +530,8 @@ const utilBuildAgentResponse = async (
edges: IReactFlowEdge[],
baseURL?: string,
sseStreamer?: IServerSideEventStreamer,
shouldStreamResponse?: boolean
shouldStreamResponse?: boolean,
uploadedFilesContent?: string
) => {
try {
const appServer = getRunningExpressApp()
@@ -530,7 +544,8 @@ const utilBuildAgentResponse = async (
isInternal,
baseURL,
sseStreamer,
shouldStreamResponse
shouldStreamResponse,
uploadedFilesContent
)
if (streamResults) {
const { finalResult, finalAction, sourceDocuments, artifacts, usedTools, agentReasoning } = streamResults
@@ -538,7 +553,7 @@ const utilBuildAgentResponse = async (
role: 'userMessage',
content: incomingInput.question,
chatflowid: agentflow.id,
chatType: isInternal ? chatType.INTERNAL : chatType.EXTERNAL,
chatType: isInternal ? ChatType.INTERNAL : ChatType.EXTERNAL,
chatId,
memoryType,
sessionId,
@@ -553,7 +568,7 @@ const utilBuildAgentResponse = async (
role: 'apiMessage',
content: finalResult,
chatflowid: agentflow.id,
chatType: isInternal ? chatType.INTERNAL : chatType.EXTERNAL,
chatType: isInternal ? ChatType.INTERNAL : ChatType.EXTERNAL,
chatId,
memoryType,
sessionId
@@ -581,7 +596,7 @@ const utilBuildAgentResponse = async (
version: await getAppVersion(),
agentflowId: agentflow.id,
chatId,
type: isInternal ? chatType.INTERNAL : chatType.EXTERNAL,
type: isInternal ? ChatType.INTERNAL : ChatType.EXTERNAL,
flowGraph: getTelemetryFlowObj(nodes, edges)
})
@@ -0,0 +1,84 @@
import { Request } from 'express'
import * as path from 'path'
import * as fs from 'fs'
import { addArrayFilesToStorage, IDocument, mapExtToInputField, mapMimeTypeToInputField } from 'flowise-components'
import { getRunningExpressApp } from './getRunningExpressApp'
import { getErrorMessage } from '../errors/utils'
/**
* Create attachment
* @param {Request} req
*/
export const createFileAttachment = async (req: Request) => {
const appServer = getRunningExpressApp()
const chatflowid = req.params.chatflowId
if (!chatflowid) {
throw new Error(
'Params chatflowId is required! Please provide chatflowId and chatId in the URL: /api/v1/attachments/:chatflowId/:chatId'
)
}
const chatId = req.params.chatId
if (!chatId) {
throw new Error(
'Params chatId is required! Please provide chatflowId and chatId in the URL: /api/v1/attachments/:chatflowId/:chatId'
)
}
// Find FileLoader node
const fileLoaderComponent = appServer.nodesPool.componentNodes['fileLoader']
const fileLoaderNodeInstanceFilePath = fileLoaderComponent.filePath as string
const fileLoaderNodeModule = await import(fileLoaderNodeInstanceFilePath)
const fileLoaderNodeInstance = new fileLoaderNodeModule.nodeClass()
const options = {
retrieveAttachmentChatId: true,
chatflowid,
chatId
}
const files = (req.files as Express.Multer.File[]) || []
const fileAttachments = []
if (files.length) {
for (const file of files) {
const fileBuffer = fs.readFileSync(file.path)
const fileNames: string[] = []
const storagePath = await addArrayFilesToStorage(file.mimetype, fileBuffer, file.originalname, fileNames, chatflowid, chatId)
const fileInputFieldFromMimeType = mapMimeTypeToInputField(file.mimetype)
const fileExtension = path.extname(file.originalname)
const fileInputFieldFromExt = mapExtToInputField(fileExtension)
let fileInputField = 'txtFile'
if (fileInputFieldFromExt !== 'txtFile') {
fileInputField = fileInputFieldFromExt
} else if (fileInputFieldFromMimeType !== 'txtFile') {
fileInputField = fileInputFieldFromExt
}
fs.unlinkSync(file.path)
try {
const nodeData = {
inputs: {
[fileInputField]: storagePath
}
}
const documents: IDocument[] = await fileLoaderNodeInstance.init(nodeData, '', options)
const pageContents = documents.map((doc) => doc.pageContent).join('\n')
fileAttachments.push({
name: file.originalname,
mimeType: file.mimetype,
size: file.size,
content: pageContents
})
} catch (error) {
throw new Error(`Failed operation: createFileAttachment - ${getErrorMessage(error)}`)
}
}
}
return fileAttachments
}
+4 -17
View File
@@ -1,13 +1,14 @@
import { MoreThanOrEqual, LessThanOrEqual } from 'typeorm'
import { ChatMessageRatingType, chatType } from '../Interface'
import { ChatMessageRatingType, ChatType } from '../Interface'
import { ChatMessage } from '../database/entities/ChatMessage'
import { ChatMessageFeedback } from '../database/entities/ChatMessageFeedback'
import { getRunningExpressApp } from '../utils/getRunningExpressApp'
import { aMonthAgo, setDateToStartOrEndOfDay } from '.'
/**
* Method that get chat messages.
* @param {string} chatflowid
* @param {chatType} chatType
* @param {ChatType} chatType
* @param {string} sortOrder
* @param {string} chatId
* @param {string} memoryType
@@ -19,7 +20,7 @@ import { getRunningExpressApp } from '../utils/getRunningExpressApp'
*/
export const utilGetChatMessage = async (
chatflowid: string,
chatType: chatType | undefined,
chatType: ChatType | undefined,
sortOrder: string = 'ASC',
chatId?: string,
memoryType?: string,
@@ -31,20 +32,6 @@ export const utilGetChatMessage = async (
feedbackTypes?: ChatMessageRatingType[]
): Promise<ChatMessage[]> => {
const appServer = getRunningExpressApp()
const setDateToStartOrEndOfDay = (dateTimeStr: string, setHours: 'start' | 'end') => {
const date = new Date(dateTimeStr)
if (isNaN(date.getTime())) {
return undefined
}
setHours === 'start' ? date.setHours(0, 0, 0, 0) : date.setHours(23, 59, 59, 999)
return date
}
const aMonthAgo = () => {
const date = new Date()
date.setMonth(new Date().getMonth() - 1)
return date
}
let fromDate
if (startDate) fromDate = setDateToStartOrEndOfDay(startDate, 'start')
@@ -8,7 +8,7 @@ import { InternalFlowiseError } from '../errors/internalFlowiseError'
type IUploadConfig = {
isSpeechToTextEnabled: boolean
isImageUploadAllowed: boolean
isFileUploadAllowed: boolean
isRAGFileUploadAllowed: boolean
imgUploadSizeAndTypes: IUploadFileSizeAndTypes[]
fileUploadSizeAndTypes: IUploadFileSizeAndTypes[]
}
@@ -32,7 +32,7 @@ export const utilGetUploadsConfig = async (chatflowid: string): Promise<IUploadC
let isSpeechToTextEnabled = false
let isImageUploadAllowed = false
let isFileUploadAllowed = false
let isRAGFileUploadAllowed = false
/*
* Check for STT
@@ -51,7 +51,7 @@ export const utilGetUploadsConfig = async (chatflowid: string): Promise<IUploadC
}
/*
* Condition for isFileUploadAllowed
* Condition for isRAGFileUploadAllowed
* 1.) vector store with fileUpload = true && connected to a document loader with fileType
*/
const fileUploadSizeAndTypes: IUploadFileSizeAndTypes[] = []
@@ -70,7 +70,7 @@ export const utilGetUploadsConfig = async (chatflowid: string): Promise<IUploadC
fileTypes: fileType.split(', '),
maxUploadSize: 500
})
isFileUploadAllowed = true
isRAGFileUploadAllowed = true
}
}
break
@@ -114,7 +114,7 @@ export const utilGetUploadsConfig = async (chatflowid: string): Promise<IUploadC
return {
isSpeechToTextEnabled,
isImageUploadAllowed,
isFileUploadAllowed,
isRAGFileUploadAllowed,
imgUploadSizeAndTypes,
fileUploadSizeAndTypes
}
+34 -6
View File
@@ -48,6 +48,7 @@ import { InternalFlowiseError } from '../errors/internalFlowiseError'
import { StatusCodes } from 'http-status-codes'
const QUESTION_VAR_PREFIX = 'question'
const FILE_ATTACHMENT_PREFIX = 'file_attachment'
const CHAT_HISTORY_VAR_PREFIX = 'chat_history'
const REDACTED_CREDENTIAL_VALUE = '_FLOWISE_BLANK_07167752-1a71-43b1-bf8f-4f32252165db'
@@ -438,6 +439,7 @@ type BuildFlowParams = {
stopNodeId?: string
uploads?: IFileUpload[]
baseURL?: string
uploadedFilesContent?: string
}
/**
@@ -452,6 +454,7 @@ export const buildFlow = async ({
depthQueue,
componentNodes,
question,
uploadedFilesContent,
chatHistory,
apiMessageId,
chatId,
@@ -516,7 +519,8 @@ export const buildFlow = async ({
flowNodes,
question,
chatHistory,
flowData
flowData,
uploadedFilesContent
)
if (isUpsert && stopNodeId && nodeId === stopNodeId) {
@@ -546,7 +550,8 @@ export const buildFlow = async ({
initializedNodes.add(nodeId)
} else {
logger.debug(`[server]: Initializing ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
let outputResult = await newNodeInstance.init(reactFlowNodeData, question, {
const finalQuestion = uploadedFilesContent ? `${uploadedFilesContent}\n\n${question}` : question
let outputResult = await newNodeInstance.init(reactFlowNodeData, finalQuestion, {
chatId,
sessionId,
chatflowid,
@@ -770,7 +775,8 @@ export const getVariableValue = async (
question: string,
chatHistory: IMessage[],
isAcceptVariable = false,
flowData?: ICommonObject
flowData?: ICommonObject,
uploadedFilesContent?: string
) => {
const isObject = typeof paramValue === 'object'
const initialValue = (isObject ? JSON.stringify(paramValue) : paramValue) ?? ''
@@ -803,6 +809,10 @@ export const getVariableValue = async (
variableDict[`{{${variableFullPath}}}`] = handleEscapeCharacters(question, false)
}
if (isAcceptVariable && variableFullPath === FILE_ATTACHMENT_PREFIX) {
variableDict[`{{${variableFullPath}}}`] = handleEscapeCharacters(uploadedFilesContent, false)
}
if (isAcceptVariable && variableFullPath === CHAT_HISTORY_VAR_PREFIX) {
variableDict[`{{${variableFullPath}}}`] = handleEscapeCharacters(convertChatHistoryToText(chatHistory), false)
}
@@ -916,7 +926,8 @@ export const resolveVariables = async (
reactFlowNodes: IReactFlowNode[],
question: string,
chatHistory: IMessage[],
flowData?: ICommonObject
flowData?: ICommonObject,
uploadedFilesContent?: string
): Promise<INodeData> => {
let flowNodeData = cloneDeep(reactFlowNodeData)
const types = 'inputs'
@@ -934,7 +945,8 @@ export const resolveVariables = async (
question,
chatHistory,
undefined,
flowData
flowData,
uploadedFilesContent
)
resolvedInstances.push(resolvedInstance)
}
@@ -948,7 +960,8 @@ export const resolveVariables = async (
question,
chatHistory,
isAcceptVariable,
flowData
flowData,
uploadedFilesContent
)
paramsObj[key] = resolvedInstance
}
@@ -1572,3 +1585,18 @@ export const convertToValidFilename = (word: string) => {
.replace(' ', '')
.toLowerCase()
}
export const setDateToStartOrEndOfDay = (dateTimeStr: string, setHours: 'start' | 'end') => {
const date = new Date(dateTimeStr)
if (isNaN(date.getTime())) {
return undefined
}
setHours === 'start' ? date.setHours(0, 0, 0, 0) : date.setHours(23, 59, 59, 999)
return date
}
export const aMonthAgo = () => {
const date = new Date()
date.setMonth(new Date().getMonth() - 1)
return date
}
+2 -2
View File
@@ -16,7 +16,7 @@ import {
getStartingNodes
} from '../utils'
import { validateChatflowAPIKey } from './validateKey'
import { IncomingInput, INodeDirectedGraph, IReactFlowObject, chatType } from '../Interface'
import { IncomingInput, INodeDirectedGraph, IReactFlowObject, ChatType } from '../Interface'
import { ChatFlow } from '../database/entities/ChatFlow'
import { getRunningExpressApp } from '../utils/getRunningExpressApp'
import { UpsertHistory } from '../database/entities/UpsertHistory'
@@ -195,7 +195,7 @@ export const upsertVector = async (req: Request, isInternal: boolean = false) =>
data: {
version: await getAppVersion(),
chatlowId: chatflowid,
type: isInternal ? chatType.INTERNAL : chatType.EXTERNAL,
type: isInternal ? ChatType.INTERNAL : ChatType.EXTERNAL,
flowGraph: getTelemetryFlowObj(nodes, edges),
stopNodeId
}