Feature/s3 storage (#2226)

* centralizing file writing....

* allowing s3 as storage option

* allowing s3 as storage option

* update s3 storage

---------

Co-authored-by: Henry <hzj94@hotmail.com>
This commit is contained in:
Vinod Kiran
2024-04-23 16:05:38 +05:30
committed by GitHub
parent 6ab259b6aa
commit 7006d64de0
34 changed files with 458 additions and 257 deletions
+17 -9
View File
@@ -1,20 +1,19 @@
PORT=3000
# CORS_ORIGINS="*"
# IFRAME_ORIGINS="*"
# DATABASE_PATH=/your_database_path/.flowise
# APIKEY_PATH=/your_api_key_path/.flowise
# SECRETKEY_PATH=/your_api_key_path/.flowise
# LOG_PATH=/your_log_path/.flowise/logs
# BLOB_STORAGE_PATH=/your_storage_path/.flowise/storage
# NUMBER_OF_PROXIES= 1
# CORS_ORIGINS=*
# IFRAME_ORIGINS=*
# DATABASE_PATH=/your_database_path/.flowise
# DATABASE_TYPE=postgres
# DATABASE_PORT=""
# DATABASE_PORT=5432
# DATABASE_HOST=""
# DATABASE_NAME="flowise"
# DATABASE_USER=""
# DATABASE_PASSWORD=""
# DATABASE_NAME=flowise
# DATABASE_USER=root
# DATABASE_PASSWORD=mypassword
# DATABASE_SSL=true
# DATABASE_SSL_KEY_BASE64=<Self signed certificate in BASE64>
@@ -22,7 +21,9 @@ PORT=3000
# FLOWISE_PASSWORD=1234
# FLOWISE_SECRETKEY_OVERWRITE=myencryptionkey
# FLOWISE_FILE_SIZE_LIMIT=50mb
# DEBUG=true
# LOG_PATH=/your_log_path/.flowise/logs
# LOG_LEVEL=debug (error | warn | info | verbose | debug)
# TOOL_FUNCTION_BUILTIN_DEP=crypto,fs
# TOOL_FUNCTION_EXTERNAL_DEP=moment,lodash
@@ -37,3 +38,10 @@ PORT=3000
# Uncomment the following line to enable model list config, load the list of models from your local config file
# see https://raw.githubusercontent.com/FlowiseAI/Flowise/main/packages/components/models.json for the format
# MODEL_LIST_CONFIG_JSON=/your_model_list_config_file_path
# STORAGE_TYPE=local (local | s3)
# BLOB_STORAGE_PATH=/your_storage_path/.flowise/storage
# S3_STORAGE_BUCKET_NAME=flowise
# S3_STORAGE_ACCESS_KEY_ID=<your-access-key>
# S3_STORAGE_SECRET_ACCESS_KEY=<your-secret-key>
# S3_STORAGE_REGION=us-west-2
+15 -5
View File
@@ -46,7 +46,12 @@ export default class Start extends Command {
LANGCHAIN_API_KEY: Flags.string(),
LANGCHAIN_PROJECT: Flags.string(),
DISABLE_FLOWISE_TELEMETRY: Flags.string(),
MODEL_LIST_CONFIG_JSON: Flags.string()
MODEL_LIST_CONFIG_JSON: Flags.string(),
STORAGE_TYPE: Flags.string(),
S3_STORAGE_BUCKET_NAME: Flags.string(),
S3_STORAGE_ACCESS_KEY_ID: Flags.string(),
S3_STORAGE_SECRET_ACCESS_KEY: Flags.string(),
S3_STORAGE_REGION: Flags.string()
}
async stopProcess() {
@@ -94,10 +99,7 @@ export default class Start extends Command {
if (flags.FLOWISE_PASSWORD) process.env.FLOWISE_PASSWORD = flags.FLOWISE_PASSWORD
if (flags.APIKEY_PATH) process.env.APIKEY_PATH = flags.APIKEY_PATH
// Storage
if (flags.BLOB_STORAGE_PATH) process.env.BLOB_STORAGE_PATH = flags.BLOB_STORAGE_PATH
//API Configuration
// API Configuration
if (flags.FLOWISE_FILE_SIZE_LIMIT) process.env.FLOWISE_FILE_SIZE_LIMIT = flags.FLOWISE_FILE_SIZE_LIMIT
// Credentials
@@ -138,6 +140,14 @@ export default class Start extends Command {
// Model list config
if (flags.MODEL_LIST_CONFIG_JSON) process.env.MODEL_LIST_CONFIG_JSON = flags.MODEL_LIST_CONFIG_JSON
// Storage
if (flags.STORAGE_TYPE) process.env.STORAGE_TYPE = flags.STORAGE_TYPE
if (flags.BLOB_STORAGE_PATH) process.env.BLOB_STORAGE_PATH = flags.BLOB_STORAGE_PATH
if (flags.S3_STORAGE_BUCKET_NAME) process.env.S3_STORAGE_BUCKET_NAME = flags.S3_STORAGE_BUCKET_NAME
if (flags.S3_STORAGE_ACCESS_KEY_ID) process.env.S3_STORAGE_ACCESS_KEY_ID = flags.S3_STORAGE_ACCESS_KEY_ID
if (flags.S3_STORAGE_SECRET_ACCESS_KEY) process.env.S3_STORAGE_SECRET_ACCESS_KEY = flags.S3_STORAGE_SECRET_ACCESS_KEY
if (flags.S3_STORAGE_REGION) process.env.S3_STORAGE_REGION = flags.S3_STORAGE_REGION
await (async () => {
try {
logger.info('Starting Flowise...')
@@ -1,10 +1,11 @@
import { Request, Response, NextFunction } from 'express'
import path from 'path'
import fs from 'fs'
import contentDisposition from 'content-disposition'
import { getStoragePath } from 'flowise-components'
import * as fs from 'fs'
import { streamStorageFile } from 'flowise-components'
import { StatusCodes } from 'http-status-codes'
import { InternalFlowiseError } from '../../errors/internalFlowiseError'
const streamUploadedImage = async (req: Request, res: Response, next: NextFunction) => {
const streamUploadedFile = async (req: Request, res: Response, next: NextFunction) => {
try {
if (!req.query.chatflowId || !req.query.chatId || !req.query.fileName) {
return res.status(500).send(`Invalid file path`)
@@ -12,20 +13,15 @@ const streamUploadedImage = async (req: Request, res: Response, next: NextFuncti
const chatflowId = req.query.chatflowId as string
const chatId = req.query.chatId as string
const fileName = req.query.fileName as string
const filePath = path.join(getStoragePath(), chatflowId, chatId, fileName)
//raise error if file path is not absolute
if (!path.isAbsolute(filePath)) return res.status(500).send(`Invalid file path`)
//raise error if file path contains '..'
if (filePath.includes('..')) return res.status(500).send(`Invalid file path`)
//only return from the storage folder
if (!filePath.startsWith(getStoragePath())) return res.status(500).send(`Invalid file path`)
res.setHeader('Content-Disposition', contentDisposition(fileName))
const fileStream = await streamStorageFile(chatflowId, chatId, fileName)
if (fs.existsSync(filePath)) {
res.setHeader('Content-Disposition', contentDisposition(path.basename(filePath)))
const fileStream = fs.createReadStream(filePath)
if (!fileStream) throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, `Error: streamStorageFile`)
if (fileStream instanceof fs.ReadStream && fileStream?.pipe) {
fileStream.pipe(res)
} else {
return res.status(404).send(`File ${fileName} not found`)
res.send(fileStream)
}
} catch (error) {
next(error)
@@ -33,5 +29,5 @@ const streamUploadedImage = async (req: Request, res: Response, next: NextFuncti
}
export default {
streamUploadedImage
streamUploadedFile
}
@@ -3,6 +3,6 @@ import getUploadFileController from '../../controllers/get-upload-file'
const router = express.Router()
// READ
router.get('/', getUploadFileController.streamUploadedImage)
router.get('/', getUploadFileController.streamUploadedFile)
export default router
@@ -1,13 +1,11 @@
import { FindOptionsWhere } from 'typeorm'
import path from 'path'
import { StatusCodes } from 'http-status-codes'
import { chatType, IChatMessage } from '../../Interface'
import { utilGetChatMessage } from '../../utils/getChatMessage'
import { utilAddChatMessage } from '../../utils/addChatMesage'
import { getRunningExpressApp } from '../../utils/getRunningExpressApp'
import { ChatMessageFeedback } from '../../database/entities/ChatMessageFeedback'
import { getStoragePath } from 'flowise-components'
import { deleteFolderRecursive } from '../../utils'
import { removeFilesFromStorage } from 'flowise-components'
import logger from '../../utils/logger'
import { ChatMessage } from '../../database/entities/ChatMessage'
import { InternalFlowiseError } from '../../errors/internalFlowiseError'
@@ -100,15 +98,14 @@ const removeAllChatMessages = async (chatId: string, chatflowid: string, deleteO
try {
const appServer = getRunningExpressApp()
// remove all related feedback records
// Remove all related feedback records
const feedbackDeleteOptions: FindOptionsWhere<ChatMessageFeedback> = { chatId }
await appServer.AppDataSource.getRepository(ChatMessageFeedback).delete(feedbackDeleteOptions)
// Delete all uploads corresponding to this chatflow/chatId
if (chatId) {
try {
const directory = path.join(getStoragePath(), chatflowid, chatId)
deleteFolderRecursive(directory)
await removeFilesFromStorage(chatflowid, chatId)
} catch (e) {
logger.error(`[server]: Error deleting file storage for chatflow ${chatflowid}, chatId ${chatId}: ${e}`)
}
@@ -1,19 +1,11 @@
import path from 'path'
import { StatusCodes } from 'http-status-codes'
import { InternalFlowiseError } from '../../errors/internalFlowiseError'
import { getRunningExpressApp } from '../../utils/getRunningExpressApp'
import { IChatFlow } from '../../Interface'
import { ChatFlow } from '../../database/entities/ChatFlow'
import {
getAppVersion,
getTelemetryFlowObj,
deleteFolderRecursive,
isFlowValidForStream,
constructGraphs,
getEndingNodes
} from '../../utils'
import { getAppVersion, getTelemetryFlowObj, isFlowValidForStream, constructGraphs, getEndingNodes } from '../../utils'
import logger from '../../utils/logger'
import { getStoragePath } from 'flowise-components'
import { removeFolderFromStorage } from 'flowise-components'
import { IReactFlowObject } from '../../Interface'
import { utilGetUploadsConfig } from '../../utils/getUploadsConfig'
import { ChatMessage } from '../../database/entities/ChatMessage'
@@ -83,8 +75,7 @@ const deleteChatflow = async (chatflowId: string): Promise<any> => {
const dbResponse = await appServer.AppDataSource.getRepository(ChatFlow).delete({ id: chatflowId })
try {
// Delete all uploads corresponding to this chatflow
const directory = path.join(getStoragePath(), chatflowId)
deleteFolderRecursive(directory)
await removeFolderFromStorage(chatflowId)
// Delete all chat messages
await appServer.AppDataSource.getRepository(ChatMessage).delete({ chatflowid: chatflowId })
@@ -174,7 +165,7 @@ const saveChatflow = async (newChatFlow: ChatFlow): Promise<any> => {
const step1Results = await appServer.AppDataSource.getRepository(ChatFlow).save(chatflow)
// step 2 - convert base64 to file paths and update the chatflow
step1Results.flowData = updateFlowDataWithFilePaths(step1Results.id, incomingFlowData)
step1Results.flowData = await updateFlowDataWithFilePaths(step1Results.id, incomingFlowData)
dbResponse = await appServer.AppDataSource.getRepository(ChatFlow).save(step1Results)
} else {
const chatflow = appServer.AppDataSource.getRepository(ChatFlow).create(newChatFlow)
@@ -198,7 +189,7 @@ const updateChatflow = async (chatflow: ChatFlow, updateChatFlow: ChatFlow): Pro
try {
const appServer = getRunningExpressApp()
if (updateChatFlow.flowData && containsBase64File(updateChatFlow)) {
updateChatFlow.flowData = updateFlowDataWithFilePaths(chatflow.id, updateChatFlow.flowData)
updateChatFlow.flowData = await updateFlowDataWithFilePaths(chatflow.id, updateChatFlow.flowData)
}
const newDbChatflow = appServer.AppDataSource.getRepository(ChatFlow).merge(chatflow, updateChatFlow)
const dbResponse = await appServer.AppDataSource.getRepository(ChatFlow).save(newDbChatflow)
+4 -10
View File
@@ -1,8 +1,7 @@
import { Request } from 'express'
import { IFileUpload, getStoragePath, convertSpeechToText, ICommonObject } from 'flowise-components'
import { IFileUpload, convertSpeechToText, ICommonObject, addFileToStorage } from 'flowise-components'
import { StatusCodes } from 'http-status-codes'
import { IncomingInput, IMessage, INodeData, IReactFlowObject, IReactFlowNode, IDepthQueue, chatType, IChatMessage } from '../Interface'
import path from 'path'
import { InternalFlowiseError } from '../errors/internalFlowiseError'
import { ChatFlow } from '../database/entities/ChatFlow'
import { Server } from 'socket.io'
@@ -69,17 +68,12 @@ export const utilBuildChatflow = async (req: Request, socketIO?: Server, isInter
if ((upload.type === 'file' || upload.type === 'audio') && upload.data) {
const filename = upload.name
const dir = path.join(getStoragePath(), chatflowid, chatId)
if (!fs.existsSync(dir)) {
fs.mkdirSync(dir, { recursive: true })
}
const filePath = path.join(dir, filename)
const splitDataURI = upload.data.split(',')
const bf = Buffer.from(splitDataURI.pop() || '', 'base64')
fs.writeFileSync(filePath, bf)
// Omit upload.data since we don't store the content in database
const mime = splitDataURI[0].split(':')[1].split(';')[0]
await addFileToStorage(mime, bf, filename, chatflowid, chatId)
upload.type = 'stored-file'
// Omit upload.data since we don't store the content in database
fileUploads[i] = omit(upload, ['data'])
}
+4 -22
View File
@@ -1,8 +1,6 @@
import { ChatFlow } from '../database/entities/ChatFlow'
import path from 'path'
import { getStoragePath } from 'flowise-components'
import fs from 'fs'
import { IReactFlowObject } from '../Interface'
import { addBase64FilesToStorage } from 'flowise-components'
export const containsBase64File = (chatflow: ChatFlow) => {
const parsedFlowData: IReactFlowObject = JSON.parse(chatflow.flowData)
@@ -48,23 +46,7 @@ export const containsBase64File = (chatflow: ChatFlow) => {
return found
}
function addFileToStorage(file: string, chatflowid: string, fileNames: string[]) {
const dir = path.join(getStoragePath(), chatflowid)
if (!fs.existsSync(dir)) {
fs.mkdirSync(dir, { recursive: true })
}
const splitDataURI = file.split(',')
const filename = splitDataURI.pop()?.split(':')[1] ?? ''
const bf = Buffer.from(splitDataURI.pop() || '', 'base64')
const filePath = path.join(dir, filename)
fs.writeFileSync(filePath, bf)
fileNames.push(filename)
return 'FILE-STORAGE::' + JSON.stringify(fileNames)
}
export const updateFlowDataWithFilePaths = (chatflowid: string, flowData: string) => {
export const updateFlowDataWithFilePaths = async (chatflowid: string, flowData: string) => {
try {
const parsedFlowData: IReactFlowObject = JSON.parse(flowData)
const re = new RegExp('^data.*;base64', 'i')
@@ -93,14 +75,14 @@ export const updateFlowDataWithFilePaths = (chatflowid: string, flowData: string
for (let j = 0; j < files.length; j++) {
const file = files[j]
if (re.test(file)) {
node.data.inputs[key] = addFileToStorage(file, chatflowid, fileNames)
node.data.inputs[key] = await addBase64FilesToStorage(file, chatflowid, fileNames)
}
}
} catch (e) {
continue
}
} else if (re.test(input)) {
node.data.inputs[key] = addFileToStorage(input, chatflowid, fileNames)
node.data.inputs[key] = await addBase64FilesToStorage(input, chatflowid, fileNames)
}
}
}
-28
View File
@@ -1324,34 +1324,6 @@ export const getAllValuesFromJson = (obj: any): any[] => {
return values
}
/**
* Delete file & folder recursively
* @param {string} directory
*/
export const deleteFolderRecursive = (directory: string) => {
if (fs.existsSync(directory)) {
fs.readdir(directory, (error, files) => {
if (error) throw new Error('Could not read directory')
files.forEach((file) => {
const file_path = path.join(directory, file)
fs.stat(file_path, (error, stat) => {
if (error) throw new Error('File do not exist')
if (!stat.isDirectory()) {
fs.unlink(file_path, (error) => {
if (error) throw new Error('Could not delete file')
})
} else {
deleteFolderRecursive(file_path)
}
})
})
})
}
}
/**
* Get only essential flow data items for telemetry
* @param {IReactFlowNode[]} nodes