mirror of
https://github.com/farcasclaudiu/Flowise.git
synced 2026-06-29 09:01:06 +03:00
Feature/Multer to s3 (#2408)
* add ability to store files from multer to s3 * add check to bypass doc loader
This commit is contained in:
@@ -8,7 +8,7 @@ import { zodToJsonSchema } from 'zod-to-json-schema'
|
|||||||
import { AnalyticHandler } from '../../../src/handler'
|
import { AnalyticHandler } from '../../../src/handler'
|
||||||
import { Moderation, checkInputs, streamResponse } from '../../moderation/Moderation'
|
import { Moderation, checkInputs, streamResponse } from '../../moderation/Moderation'
|
||||||
import { formatResponse } from '../../outputparsers/OutputParserHelpers'
|
import { formatResponse } from '../../outputparsers/OutputParserHelpers'
|
||||||
import { addFileToStorage } from '../../../src/storageUtils'
|
import { addSingleFileToStorage } from '../../../src/storageUtils'
|
||||||
|
|
||||||
const lenticularBracketRegex = /【[^】]*】/g
|
const lenticularBracketRegex = /【[^】]*】/g
|
||||||
const imageRegex = /<img[^>]*\/>/g
|
const imageRegex = /<img[^>]*\/>/g
|
||||||
@@ -731,7 +731,7 @@ const downloadImg = async (openai: OpenAI, fileId: string, fileName: string, ...
|
|||||||
const image_data_buffer = Buffer.from(image_data)
|
const image_data_buffer = Buffer.from(image_data)
|
||||||
const mime = 'image/png'
|
const mime = 'image/png'
|
||||||
|
|
||||||
await addFileToStorage(mime, image_data_buffer, fileName, ...paths)
|
await addSingleFileToStorage(mime, image_data_buffer, fileName, ...paths)
|
||||||
|
|
||||||
return image_data_buffer
|
return image_data_buffer
|
||||||
}
|
}
|
||||||
@@ -754,7 +754,7 @@ const downloadFile = async (openAIApiKey: string, fileObj: any, fileName: string
|
|||||||
const data_buffer = Buffer.from(data)
|
const data_buffer = Buffer.from(data)
|
||||||
const mime = 'application/octet-stream'
|
const mime = 'application/octet-stream'
|
||||||
|
|
||||||
return await addFileToStorage(mime, data_buffer, fileName, ...paths)
|
return await addSingleFileToStorage(mime, data_buffer, fileName, ...paths)
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('Error downloading or writing the file:', error)
|
console.error('Error downloading or writing the file:', error)
|
||||||
return ''
|
return ''
|
||||||
|
|||||||
@@ -4,12 +4,12 @@ import { DeleteObjectsCommand, GetObjectCommand, ListObjectsV2Command, PutObject
|
|||||||
import { Readable } from 'node:stream'
|
import { Readable } from 'node:stream'
|
||||||
import { getUserHome } from './utils'
|
import { getUserHome } from './utils'
|
||||||
|
|
||||||
export const addBase64FilesToStorage = async (file: string, chatflowid: string, fileNames: string[]) => {
|
export const addBase64FilesToStorage = async (fileBase64: string, chatflowid: string, fileNames: string[]) => {
|
||||||
const storageType = getStorageType()
|
const storageType = getStorageType()
|
||||||
if (storageType === 's3') {
|
if (storageType === 's3') {
|
||||||
const { s3Client, Bucket } = getS3Config()
|
const { s3Client, Bucket } = getS3Config()
|
||||||
|
|
||||||
const splitDataURI = file.split(',')
|
const splitDataURI = fileBase64.split(',')
|
||||||
const filename = splitDataURI.pop()?.split(':')[1] ?? ''
|
const filename = splitDataURI.pop()?.split(':')[1] ?? ''
|
||||||
const bf = Buffer.from(splitDataURI.pop() || '', 'base64')
|
const bf = Buffer.from(splitDataURI.pop() || '', 'base64')
|
||||||
const mime = splitDataURI[0].split(':')[1].split(';')[0]
|
const mime = splitDataURI[0].split(':')[1].split(';')[0]
|
||||||
@@ -32,7 +32,7 @@ export const addBase64FilesToStorage = async (file: string, chatflowid: string,
|
|||||||
fs.mkdirSync(dir, { recursive: true })
|
fs.mkdirSync(dir, { recursive: true })
|
||||||
}
|
}
|
||||||
|
|
||||||
const splitDataURI = file.split(',')
|
const splitDataURI = fileBase64.split(',')
|
||||||
const filename = splitDataURI.pop()?.split(':')[1] ?? ''
|
const filename = splitDataURI.pop()?.split(':')[1] ?? ''
|
||||||
const bf = Buffer.from(splitDataURI.pop() || '', 'base64')
|
const bf = Buffer.from(splitDataURI.pop() || '', 'base64')
|
||||||
|
|
||||||
@@ -43,7 +43,40 @@ export const addBase64FilesToStorage = async (file: string, chatflowid: string,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export const addFileToStorage = async (mime: string, bf: Buffer, fileName: string, ...paths: string[]) => {
|
export const addArrayFilesToStorage = async (mime: string, bf: Buffer, fileName: string, fileNames: string[], ...paths: string[]) => {
|
||||||
|
const storageType = getStorageType()
|
||||||
|
if (storageType === 's3') {
|
||||||
|
const { s3Client, Bucket } = getS3Config()
|
||||||
|
|
||||||
|
let Key = paths.reduce((acc, cur) => acc + '/' + cur, '') + '/' + fileName
|
||||||
|
if (Key.startsWith('/')) {
|
||||||
|
Key = Key.substring(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
const putObjCmd = new PutObjectCommand({
|
||||||
|
Bucket,
|
||||||
|
Key,
|
||||||
|
ContentEncoding: 'base64', // required for binary data
|
||||||
|
ContentType: mime,
|
||||||
|
Body: bf
|
||||||
|
})
|
||||||
|
await s3Client.send(putObjCmd)
|
||||||
|
fileNames.push(fileName)
|
||||||
|
return 'FILE-STORAGE::' + JSON.stringify(fileNames)
|
||||||
|
} else {
|
||||||
|
const dir = path.join(getStoragePath(), ...paths)
|
||||||
|
if (!fs.existsSync(dir)) {
|
||||||
|
fs.mkdirSync(dir, { recursive: true })
|
||||||
|
}
|
||||||
|
|
||||||
|
const filePath = path.join(dir, fileName)
|
||||||
|
fs.writeFileSync(filePath, bf)
|
||||||
|
fileNames.push(fileName)
|
||||||
|
return 'FILE-STORAGE::' + JSON.stringify(fileNames)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export const addSingleFileToStorage = async (mime: string, bf: Buffer, fileName: string, ...paths: string[]) => {
|
||||||
const storageType = getStorageType()
|
const storageType = getStorageType()
|
||||||
if (storageType === 's3') {
|
if (storageType === 's3') {
|
||||||
const { s3Client, Bucket } = getS3Config()
|
const { s3Client, Bucket } = getS3Config()
|
||||||
@@ -273,7 +306,7 @@ export const streamStorageFile = async (
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const getS3Config = () => {
|
export const getS3Config = () => {
|
||||||
const accessKeyId = process.env.S3_STORAGE_ACCESS_KEY_ID
|
const accessKeyId = process.env.S3_STORAGE_ACCESS_KEY_ID
|
||||||
const secretAccessKey = process.env.S3_STORAGE_SECRET_ACCESS_KEY
|
const secretAccessKey = process.env.S3_STORAGE_SECRET_ACCESS_KEY
|
||||||
const region = process.env.S3_STORAGE_REGION
|
const region = process.env.S3_STORAGE_REGION
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ import { getRunningExpressApp } from '../../utils/getRunningExpressApp'
|
|||||||
import { DocumentStore } from '../../database/entities/DocumentStore'
|
import { DocumentStore } from '../../database/entities/DocumentStore'
|
||||||
// @ts-ignore
|
// @ts-ignore
|
||||||
import {
|
import {
|
||||||
addFileToStorage,
|
addSingleFileToStorage,
|
||||||
getFileFromStorage,
|
getFileFromStorage,
|
||||||
ICommonObject,
|
ICommonObject,
|
||||||
IDocument,
|
IDocument,
|
||||||
@@ -343,7 +343,7 @@ const _saveFileToStorage = async (fileBase64: string, entity: DocumentStore) =>
|
|||||||
if (mimePrefix) {
|
if (mimePrefix) {
|
||||||
mime = mimePrefix.split(';')[0].split(':')[1]
|
mime = mimePrefix.split(';')[0].split(':')[1]
|
||||||
}
|
}
|
||||||
await addFileToStorage(mime, bf, filename, DOCUMENT_STORE_BASE_FOLDER, entity.id)
|
await addSingleFileToStorage(mime, bf, filename, DOCUMENT_STORE_BASE_FOLDER, entity.id)
|
||||||
return {
|
return {
|
||||||
id: uuidv4(),
|
id: uuidv4(),
|
||||||
name: filename,
|
name: filename,
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
import { Request } from 'express'
|
import { Request } from 'express'
|
||||||
import { IFileUpload, convertSpeechToText, ICommonObject, addFileToStorage } from 'flowise-components'
|
import { IFileUpload, convertSpeechToText, ICommonObject, addSingleFileToStorage, addArrayFilesToStorage } from 'flowise-components'
|
||||||
import { StatusCodes } from 'http-status-codes'
|
import { StatusCodes } from 'http-status-codes'
|
||||||
import { IncomingInput, IMessage, INodeData, IReactFlowObject, IReactFlowNode, IDepthQueue, chatType, IChatMessage } from '../Interface'
|
import { IncomingInput, IMessage, INodeData, IReactFlowObject, IReactFlowNode, IDepthQueue, chatType, IChatMessage } from '../Interface'
|
||||||
import { InternalFlowiseError } from '../errors/internalFlowiseError'
|
import { InternalFlowiseError } from '../errors/internalFlowiseError'
|
||||||
@@ -71,7 +71,7 @@ export const utilBuildChatflow = async (req: Request, socketIO?: Server, isInter
|
|||||||
const splitDataURI = upload.data.split(',')
|
const splitDataURI = upload.data.split(',')
|
||||||
const bf = Buffer.from(splitDataURI.pop() || '', 'base64')
|
const bf = Buffer.from(splitDataURI.pop() || '', 'base64')
|
||||||
const mime = splitDataURI[0].split(':')[1].split(';')[0]
|
const mime = splitDataURI[0].split(':')[1].split(';')[0]
|
||||||
await addFileToStorage(mime, bf, filename, chatflowid, chatId)
|
await addSingleFileToStorage(mime, bf, filename, chatflowid, chatId)
|
||||||
upload.type = 'stored-file'
|
upload.type = 'stored-file'
|
||||||
// Omit upload.data since we don't store the content in database
|
// Omit upload.data since we don't store the content in database
|
||||||
fileUploads[i] = omit(upload, ['data'])
|
fileUploads[i] = omit(upload, ['data'])
|
||||||
@@ -111,20 +111,21 @@ export const utilBuildChatflow = async (req: Request, socketIO?: Server, isInter
|
|||||||
|
|
||||||
let isStreamValid = false
|
let isStreamValid = false
|
||||||
|
|
||||||
const files = (req.files as any[]) || []
|
const files = (req.files as Express.Multer.File[]) || []
|
||||||
|
|
||||||
if (files.length) {
|
if (files.length) {
|
||||||
const overrideConfig: ICommonObject = { ...req.body }
|
const overrideConfig: ICommonObject = { ...req.body }
|
||||||
|
const fileNames: string[] = []
|
||||||
for (const file of files) {
|
for (const file of files) {
|
||||||
const fileData = fs.readFileSync(file.path, { encoding: 'base64' })
|
const fileBuffer = fs.readFileSync(file.path)
|
||||||
const dataBase64String = `data:${file.mimetype};base64,${fileData},filename:${file.filename}`
|
|
||||||
|
const storagePath = await addArrayFilesToStorage(file.mimetype, fileBuffer, file.originalname, fileNames, chatflowid)
|
||||||
|
|
||||||
const fileInputField = mapMimeTypeToInputField(file.mimetype)
|
const fileInputField = mapMimeTypeToInputField(file.mimetype)
|
||||||
if (overrideConfig[fileInputField]) {
|
|
||||||
overrideConfig[fileInputField] = JSON.stringify([...JSON.parse(overrideConfig[fileInputField]), dataBase64String])
|
overrideConfig[fileInputField] = storagePath
|
||||||
} else {
|
|
||||||
overrideConfig[fileInputField] = JSON.stringify([dataBase64String])
|
fs.unlinkSync(file.path)
|
||||||
}
|
|
||||||
}
|
}
|
||||||
incomingInput = {
|
incomingInput = {
|
||||||
question: req.body.question ?? 'hello',
|
question: req.body.question ?? 'hello',
|
||||||
|
|||||||
@@ -374,6 +374,44 @@ export const saveUpsertFlowData = (nodeData: INodeData, upsertHistory: Record<st
|
|||||||
return existingUpsertFlowData
|
return existingUpsertFlowData
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if doc loader should be bypassed, ONLY if doc loader is connected to a vector store
|
||||||
|
* Reason being we dont want to load the doc loader again whenever we are building the flow, because it was already done during upserting
|
||||||
|
* TODO: Remove this logic when we remove doc loader nodes from the canvas
|
||||||
|
* @param {IReactFlowNode} reactFlowNode
|
||||||
|
* @param {IReactFlowNode[]} reactFlowNodes
|
||||||
|
* @param {IReactFlowEdge[]} reactFlowEdges
|
||||||
|
* @returns {boolean}
|
||||||
|
*/
|
||||||
|
const checkIfDocLoaderShouldBeIgnored = (
|
||||||
|
reactFlowNode: IReactFlowNode,
|
||||||
|
reactFlowNodes: IReactFlowNode[],
|
||||||
|
reactFlowEdges: IReactFlowEdge[]
|
||||||
|
): boolean => {
|
||||||
|
let outputId = ''
|
||||||
|
|
||||||
|
if (reactFlowNode.data.outputAnchors.length) {
|
||||||
|
if (Object.keys(reactFlowNode.data.outputs || {}).length) {
|
||||||
|
const output = reactFlowNode.data.outputs?.output
|
||||||
|
const node = reactFlowNode.data.outputAnchors[0].options?.find((anchor) => anchor.name === output)
|
||||||
|
if (node) outputId = (node as ICommonObject).id
|
||||||
|
} else {
|
||||||
|
outputId = (reactFlowNode.data.outputAnchors[0] as ICommonObject).id
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const targetNodeId = reactFlowEdges.find((edge) => edge.sourceHandle === outputId)?.target
|
||||||
|
|
||||||
|
if (targetNodeId) {
|
||||||
|
const targetNodeCategory = reactFlowNodes.find((nd) => nd.id === targetNodeId)?.data.category || ''
|
||||||
|
if (targetNodeCategory === 'Vector Stores') {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Build langchain from start to end
|
* Build langchain from start to end
|
||||||
* @param {string[]} startingNodeIds
|
* @param {string[]} startingNodeIds
|
||||||
@@ -446,7 +484,6 @@ export const buildFlow = async (
|
|||||||
|
|
||||||
const reactFlowNodeData: INodeData = resolveVariables(flowNodeData, flowNodes, question, chatHistory)
|
const reactFlowNodeData: INodeData = resolveVariables(flowNodeData, flowNodes, question, chatHistory)
|
||||||
|
|
||||||
// TODO: Avoid processing Text Splitter + Doc Loader once Upsert & Load Existing Vector Nodes are deprecated
|
|
||||||
if (isUpsert && stopNodeId && nodeId === stopNodeId) {
|
if (isUpsert && stopNodeId && nodeId === stopNodeId) {
|
||||||
logger.debug(`[server]: Upserting ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
|
logger.debug(`[server]: Upserting ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
|
||||||
const indexResult = await newNodeInstance.vectorStoreMethods!['upsert']!.call(newNodeInstance, reactFlowNodeData, {
|
const indexResult = await newNodeInstance.vectorStoreMethods!['upsert']!.call(newNodeInstance, reactFlowNodeData, {
|
||||||
@@ -464,6 +501,12 @@ export const buildFlow = async (
|
|||||||
if (indexResult) upsertHistory['result'] = indexResult
|
if (indexResult) upsertHistory['result'] = indexResult
|
||||||
logger.debug(`[server]: Finished upserting ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
|
logger.debug(`[server]: Finished upserting ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
|
||||||
break
|
break
|
||||||
|
} else if (
|
||||||
|
!isUpsert &&
|
||||||
|
reactFlowNode.data.category === 'Document Loaders' &&
|
||||||
|
checkIfDocLoaderShouldBeIgnored(reactFlowNode, reactFlowNodes, reactFlowEdges)
|
||||||
|
) {
|
||||||
|
initializedNodes.add(nodeId)
|
||||||
} else {
|
} else {
|
||||||
logger.debug(`[server]: Initializing ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
|
logger.debug(`[server]: Initializing ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
|
||||||
let outputResult = await newNodeInstance.init(reactFlowNodeData, question, {
|
let outputResult = await newNodeInstance.init(reactFlowNodeData, question, {
|
||||||
@@ -935,7 +978,7 @@ export const mapMimeTypeToInputField = (mimeType: string) => {
|
|||||||
case 'text/yaml':
|
case 'text/yaml':
|
||||||
return 'yamlFile'
|
return 'yamlFile'
|
||||||
default:
|
default:
|
||||||
return ''
|
return 'txtFile'
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
import { Request } from 'express'
|
import { Request } from 'express'
|
||||||
import * as fs from 'fs'
|
import * as fs from 'fs'
|
||||||
import { cloneDeep, omit } from 'lodash'
|
import { cloneDeep, omit } from 'lodash'
|
||||||
import { ICommonObject, IMessage } from 'flowise-components'
|
import { ICommonObject, IMessage, addArrayFilesToStorage } from 'flowise-components'
|
||||||
import telemetryService from '../services/telemetry'
|
import telemetryService from '../services/telemetry'
|
||||||
import logger from '../utils/logger'
|
import logger from '../utils/logger'
|
||||||
import {
|
import {
|
||||||
@@ -48,20 +48,21 @@ export const upsertVector = async (req: Request, isInternal: boolean = false) =>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const files = (req.files as any[]) || []
|
const files = (req.files as Express.Multer.File[]) || []
|
||||||
|
|
||||||
if (files.length) {
|
if (files.length) {
|
||||||
const overrideConfig: ICommonObject = { ...req.body }
|
const overrideConfig: ICommonObject = { ...req.body }
|
||||||
|
const fileNames: string[] = []
|
||||||
for (const file of files) {
|
for (const file of files) {
|
||||||
const fileData = fs.readFileSync(file.path, { encoding: 'base64' })
|
const fileBuffer = fs.readFileSync(file.path)
|
||||||
const dataBase64String = `data:${file.mimetype};base64,${fileData},filename:${file.filename}`
|
|
||||||
|
const storagePath = await addArrayFilesToStorage(file.mimetype, fileBuffer, file.originalname, fileNames, chatflowid)
|
||||||
|
|
||||||
const fileInputField = mapMimeTypeToInputField(file.mimetype)
|
const fileInputField = mapMimeTypeToInputField(file.mimetype)
|
||||||
if (overrideConfig[fileInputField]) {
|
|
||||||
overrideConfig[fileInputField] = JSON.stringify([...JSON.parse(overrideConfig[fileInputField]), dataBase64String])
|
overrideConfig[fileInputField] = storagePath
|
||||||
} else {
|
|
||||||
overrideConfig[fileInputField] = JSON.stringify([dataBase64String])
|
fs.unlinkSync(file.path)
|
||||||
}
|
|
||||||
}
|
}
|
||||||
incomingInput = {
|
incomingInput = {
|
||||||
question: req.body.question ?? 'hello',
|
question: req.body.question ?? 'hello',
|
||||||
|
|||||||
Reference in New Issue
Block a user