From b5e502f3b6c8272681265250999085dc61b62026 Mon Sep 17 00:00:00 2001 From: Henry Heng Date: Wed, 15 May 2024 19:41:37 +0100 Subject: [PATCH] Feature/Multer to s3 (#2408) * add ability to store files from multer to s3 * add check to bypass doc loader --- .../agents/OpenAIAssistant/OpenAIAssistant.ts | 6 +-- packages/components/src/storageUtils.ts | 43 +++++++++++++++-- .../src/services/documentstore/index.ts | 4 +- packages/server/src/utils/buildChatflow.ts | 21 +++++---- packages/server/src/utils/index.ts | 47 ++++++++++++++++++- packages/server/src/utils/upsertVector.ts | 19 ++++---- 6 files changed, 109 insertions(+), 31 deletions(-) diff --git a/packages/components/nodes/agents/OpenAIAssistant/OpenAIAssistant.ts b/packages/components/nodes/agents/OpenAIAssistant/OpenAIAssistant.ts index 7658a83d..cbc0da72 100644 --- a/packages/components/nodes/agents/OpenAIAssistant/OpenAIAssistant.ts +++ b/packages/components/nodes/agents/OpenAIAssistant/OpenAIAssistant.ts @@ -8,7 +8,7 @@ import { zodToJsonSchema } from 'zod-to-json-schema' import { AnalyticHandler } from '../../../src/handler' import { Moderation, checkInputs, streamResponse } from '../../moderation/Moderation' import { formatResponse } from '../../outputparsers/OutputParserHelpers' -import { addFileToStorage } from '../../../src/storageUtils' +import { addSingleFileToStorage } from '../../../src/storageUtils' const lenticularBracketRegex = /【[^】]*】/g const imageRegex = /]*\/>/g @@ -731,7 +731,7 @@ const downloadImg = async (openai: OpenAI, fileId: string, fileName: string, ... const image_data_buffer = Buffer.from(image_data) const mime = 'image/png' - await addFileToStorage(mime, image_data_buffer, fileName, ...paths) + await addSingleFileToStorage(mime, image_data_buffer, fileName, ...paths) return image_data_buffer } @@ -754,7 +754,7 @@ const downloadFile = async (openAIApiKey: string, fileObj: any, fileName: string const data_buffer = Buffer.from(data) const mime = 'application/octet-stream' - return await addFileToStorage(mime, data_buffer, fileName, ...paths) + return await addSingleFileToStorage(mime, data_buffer, fileName, ...paths) } catch (error) { console.error('Error downloading or writing the file:', error) return '' diff --git a/packages/components/src/storageUtils.ts b/packages/components/src/storageUtils.ts index 2b7a254f..7bacec17 100644 --- a/packages/components/src/storageUtils.ts +++ b/packages/components/src/storageUtils.ts @@ -4,12 +4,12 @@ import { DeleteObjectsCommand, GetObjectCommand, ListObjectsV2Command, PutObject import { Readable } from 'node:stream' import { getUserHome } from './utils' -export const addBase64FilesToStorage = async (file: string, chatflowid: string, fileNames: string[]) => { +export const addBase64FilesToStorage = async (fileBase64: string, chatflowid: string, fileNames: string[]) => { const storageType = getStorageType() if (storageType === 's3') { const { s3Client, Bucket } = getS3Config() - const splitDataURI = file.split(',') + const splitDataURI = fileBase64.split(',') const filename = splitDataURI.pop()?.split(':')[1] ?? '' const bf = Buffer.from(splitDataURI.pop() || '', 'base64') const mime = splitDataURI[0].split(':')[1].split(';')[0] @@ -32,7 +32,7 @@ export const addBase64FilesToStorage = async (file: string, chatflowid: string, fs.mkdirSync(dir, { recursive: true }) } - const splitDataURI = file.split(',') + const splitDataURI = fileBase64.split(',') const filename = splitDataURI.pop()?.split(':')[1] ?? '' const bf = Buffer.from(splitDataURI.pop() || '', 'base64') @@ -43,7 +43,40 @@ export const addBase64FilesToStorage = async (file: string, chatflowid: string, } } -export const addFileToStorage = async (mime: string, bf: Buffer, fileName: string, ...paths: string[]) => { +export const addArrayFilesToStorage = async (mime: string, bf: Buffer, fileName: string, fileNames: string[], ...paths: string[]) => { + const storageType = getStorageType() + if (storageType === 's3') { + const { s3Client, Bucket } = getS3Config() + + let Key = paths.reduce((acc, cur) => acc + '/' + cur, '') + '/' + fileName + if (Key.startsWith('/')) { + Key = Key.substring(1) + } + + const putObjCmd = new PutObjectCommand({ + Bucket, + Key, + ContentEncoding: 'base64', // required for binary data + ContentType: mime, + Body: bf + }) + await s3Client.send(putObjCmd) + fileNames.push(fileName) + return 'FILE-STORAGE::' + JSON.stringify(fileNames) + } else { + const dir = path.join(getStoragePath(), ...paths) + if (!fs.existsSync(dir)) { + fs.mkdirSync(dir, { recursive: true }) + } + + const filePath = path.join(dir, fileName) + fs.writeFileSync(filePath, bf) + fileNames.push(fileName) + return 'FILE-STORAGE::' + JSON.stringify(fileNames) + } +} + +export const addSingleFileToStorage = async (mime: string, bf: Buffer, fileName: string, ...paths: string[]) => { const storageType = getStorageType() if (storageType === 's3') { const { s3Client, Bucket } = getS3Config() @@ -273,7 +306,7 @@ export const streamStorageFile = async ( } } -const getS3Config = () => { +export const getS3Config = () => { const accessKeyId = process.env.S3_STORAGE_ACCESS_KEY_ID const secretAccessKey = process.env.S3_STORAGE_SECRET_ACCESS_KEY const region = process.env.S3_STORAGE_REGION diff --git a/packages/server/src/services/documentstore/index.ts b/packages/server/src/services/documentstore/index.ts index d19860d4..2367a933 100644 --- a/packages/server/src/services/documentstore/index.ts +++ b/packages/server/src/services/documentstore/index.ts @@ -2,7 +2,7 @@ import { getRunningExpressApp } from '../../utils/getRunningExpressApp' import { DocumentStore } from '../../database/entities/DocumentStore' // @ts-ignore import { - addFileToStorage, + addSingleFileToStorage, getFileFromStorage, ICommonObject, IDocument, @@ -343,7 +343,7 @@ const _saveFileToStorage = async (fileBase64: string, entity: DocumentStore) => if (mimePrefix) { mime = mimePrefix.split(';')[0].split(':')[1] } - await addFileToStorage(mime, bf, filename, DOCUMENT_STORE_BASE_FOLDER, entity.id) + await addSingleFileToStorage(mime, bf, filename, DOCUMENT_STORE_BASE_FOLDER, entity.id) return { id: uuidv4(), name: filename, diff --git a/packages/server/src/utils/buildChatflow.ts b/packages/server/src/utils/buildChatflow.ts index df946f98..ae452f2c 100644 --- a/packages/server/src/utils/buildChatflow.ts +++ b/packages/server/src/utils/buildChatflow.ts @@ -1,5 +1,5 @@ import { Request } from 'express' -import { IFileUpload, convertSpeechToText, ICommonObject, addFileToStorage } from 'flowise-components' +import { IFileUpload, convertSpeechToText, ICommonObject, addSingleFileToStorage, addArrayFilesToStorage } from 'flowise-components' import { StatusCodes } from 'http-status-codes' import { IncomingInput, IMessage, INodeData, IReactFlowObject, IReactFlowNode, IDepthQueue, chatType, IChatMessage } from '../Interface' import { InternalFlowiseError } from '../errors/internalFlowiseError' @@ -71,7 +71,7 @@ export const utilBuildChatflow = async (req: Request, socketIO?: Server, isInter const splitDataURI = upload.data.split(',') const bf = Buffer.from(splitDataURI.pop() || '', 'base64') const mime = splitDataURI[0].split(':')[1].split(';')[0] - await addFileToStorage(mime, bf, filename, chatflowid, chatId) + await addSingleFileToStorage(mime, bf, filename, chatflowid, chatId) upload.type = 'stored-file' // Omit upload.data since we don't store the content in database fileUploads[i] = omit(upload, ['data']) @@ -111,20 +111,21 @@ export const utilBuildChatflow = async (req: Request, socketIO?: Server, isInter let isStreamValid = false - const files = (req.files as any[]) || [] + const files = (req.files as Express.Multer.File[]) || [] if (files.length) { const overrideConfig: ICommonObject = { ...req.body } + const fileNames: string[] = [] for (const file of files) { - const fileData = fs.readFileSync(file.path, { encoding: 'base64' }) - const dataBase64String = `data:${file.mimetype};base64,${fileData},filename:${file.filename}` + const fileBuffer = fs.readFileSync(file.path) + + const storagePath = await addArrayFilesToStorage(file.mimetype, fileBuffer, file.originalname, fileNames, chatflowid) const fileInputField = mapMimeTypeToInputField(file.mimetype) - if (overrideConfig[fileInputField]) { - overrideConfig[fileInputField] = JSON.stringify([...JSON.parse(overrideConfig[fileInputField]), dataBase64String]) - } else { - overrideConfig[fileInputField] = JSON.stringify([dataBase64String]) - } + + overrideConfig[fileInputField] = storagePath + + fs.unlinkSync(file.path) } incomingInput = { question: req.body.question ?? 'hello', diff --git a/packages/server/src/utils/index.ts b/packages/server/src/utils/index.ts index e176fd62..aa0c03e8 100644 --- a/packages/server/src/utils/index.ts +++ b/packages/server/src/utils/index.ts @@ -374,6 +374,44 @@ export const saveUpsertFlowData = (nodeData: INodeData, upsertHistory: Record { + let outputId = '' + + if (reactFlowNode.data.outputAnchors.length) { + if (Object.keys(reactFlowNode.data.outputs || {}).length) { + const output = reactFlowNode.data.outputs?.output + const node = reactFlowNode.data.outputAnchors[0].options?.find((anchor) => anchor.name === output) + if (node) outputId = (node as ICommonObject).id + } else { + outputId = (reactFlowNode.data.outputAnchors[0] as ICommonObject).id + } + } + + const targetNodeId = reactFlowEdges.find((edge) => edge.sourceHandle === outputId)?.target + + if (targetNodeId) { + const targetNodeCategory = reactFlowNodes.find((nd) => nd.id === targetNodeId)?.data.category || '' + if (targetNodeCategory === 'Vector Stores') { + return true + } + } + + return false +} + /** * Build langchain from start to end * @param {string[]} startingNodeIds @@ -446,7 +484,6 @@ export const buildFlow = async ( const reactFlowNodeData: INodeData = resolveVariables(flowNodeData, flowNodes, question, chatHistory) - // TODO: Avoid processing Text Splitter + Doc Loader once Upsert & Load Existing Vector Nodes are deprecated if (isUpsert && stopNodeId && nodeId === stopNodeId) { logger.debug(`[server]: Upserting ${reactFlowNode.data.label} (${reactFlowNode.data.id})`) const indexResult = await newNodeInstance.vectorStoreMethods!['upsert']!.call(newNodeInstance, reactFlowNodeData, { @@ -464,6 +501,12 @@ export const buildFlow = async ( if (indexResult) upsertHistory['result'] = indexResult logger.debug(`[server]: Finished upserting ${reactFlowNode.data.label} (${reactFlowNode.data.id})`) break + } else if ( + !isUpsert && + reactFlowNode.data.category === 'Document Loaders' && + checkIfDocLoaderShouldBeIgnored(reactFlowNode, reactFlowNodes, reactFlowEdges) + ) { + initializedNodes.add(nodeId) } else { logger.debug(`[server]: Initializing ${reactFlowNode.data.label} (${reactFlowNode.data.id})`) let outputResult = await newNodeInstance.init(reactFlowNodeData, question, { @@ -935,7 +978,7 @@ export const mapMimeTypeToInputField = (mimeType: string) => { case 'text/yaml': return 'yamlFile' default: - return '' + return 'txtFile' } } diff --git a/packages/server/src/utils/upsertVector.ts b/packages/server/src/utils/upsertVector.ts index 73c6e3fb..175b7168 100644 --- a/packages/server/src/utils/upsertVector.ts +++ b/packages/server/src/utils/upsertVector.ts @@ -1,7 +1,7 @@ import { Request } from 'express' import * as fs from 'fs' import { cloneDeep, omit } from 'lodash' -import { ICommonObject, IMessage } from 'flowise-components' +import { ICommonObject, IMessage, addArrayFilesToStorage } from 'flowise-components' import telemetryService from '../services/telemetry' import logger from '../utils/logger' import { @@ -48,20 +48,21 @@ export const upsertVector = async (req: Request, isInternal: boolean = false) => } } - const files = (req.files as any[]) || [] + const files = (req.files as Express.Multer.File[]) || [] if (files.length) { const overrideConfig: ICommonObject = { ...req.body } + const fileNames: string[] = [] for (const file of files) { - const fileData = fs.readFileSync(file.path, { encoding: 'base64' }) - const dataBase64String = `data:${file.mimetype};base64,${fileData},filename:${file.filename}` + const fileBuffer = fs.readFileSync(file.path) + + const storagePath = await addArrayFilesToStorage(file.mimetype, fileBuffer, file.originalname, fileNames, chatflowid) const fileInputField = mapMimeTypeToInputField(file.mimetype) - if (overrideConfig[fileInputField]) { - overrideConfig[fileInputField] = JSON.stringify([...JSON.parse(overrideConfig[fileInputField]), dataBase64String]) - } else { - overrideConfig[fileInputField] = JSON.stringify([dataBase64String]) - } + + overrideConfig[fileInputField] = storagePath + + fs.unlinkSync(file.path) } incomingInput = { question: req.body.question ?? 'hello',