Feature/Add new doc store upsert and refresh API (#3556)

add new doc store upsert and refresh API
This commit is contained in:
Henry Heng
2024-11-25 15:47:13 +00:00
committed by GitHub
parent 36496b1611
commit a2c36b4447
15 changed files with 1424 additions and 803 deletions
+113 -47
View File
@@ -1,3 +1,4 @@
import { ICommonObject } from 'flowise-components'
import { DocumentStore } from './database/entities/DocumentStore'
export enum DocumentStoreStatus {
@@ -36,23 +37,25 @@ export interface IDocumentStoreFileChunk {
export interface IDocumentStoreFileChunkPagedResponse {
chunks: IDocumentStoreFileChunk[]
count: number
characters: number
file?: IDocumentStoreLoader
currentPage: number
storeName: string
description: string
docId: string
}
export interface IDocumentStoreLoader {
id: string
loaderId: string
loaderName: string
loaderConfig: any // JSON string
splitterId: string
splitterName: string
splitterConfig: any // JSON string
totalChunks: number
totalChars: number
status: DocumentStoreStatus
id?: string
loaderId?: string
loaderName?: string
loaderConfig?: any // JSON string
splitterId?: string
splitterName?: string
splitterConfig?: any // JSON string
totalChunks?: number
totalChars?: number
status?: DocumentStoreStatus
storeId?: string
files?: IDocumentStoreLoaderFile[]
source?: string
@@ -60,9 +63,37 @@ export interface IDocumentStoreLoader {
}
export interface IDocumentStoreLoaderForPreview extends IDocumentStoreLoader {
rehydrated: boolean
preview: boolean
previewChunkCount: number
rehydrated?: boolean
preview?: boolean
previewChunkCount?: number
}
export interface IDocumentStoreUpsertData {
docId: string
loader?: {
name: string
config: ICommonObject
}
splitter?: {
name: string
config: ICommonObject
}
vectorStore?: {
name: string
config: ICommonObject
}
embedding?: {
name: string
config: ICommonObject
}
recordManager?: {
name: string
config: ICommonObject
}
}
export interface IDocumentStoreRefreshData {
items: IDocumentStoreUpsertData[]
}
export interface IDocumentStoreLoaderFile {
@@ -79,6 +110,72 @@ export interface IDocumentStoreWhereUsed {
name: string
}
const getFileName = (fileBase64: string) => {
let fileNames = []
if (fileBase64.startsWith('FILE-STORAGE::')) {
const names = fileBase64.substring(14)
if (names.includes('[') && names.includes(']')) {
const files = JSON.parse(names)
return files.join(', ')
} else {
return fileBase64.substring(14)
}
}
if (fileBase64.startsWith('[') && fileBase64.endsWith(']')) {
const files = JSON.parse(fileBase64)
for (const file of files) {
const splitDataURI = file.split(',')
const filename = splitDataURI[splitDataURI.length - 1].split(':')[1]
fileNames.push(filename)
}
return fileNames.join(', ')
} else {
const splitDataURI = fileBase64.split(',')
const filename = splitDataURI[splitDataURI.length - 1].split(':')[1]
return filename
}
}
export const addLoaderSource = (loader: IDocumentStoreLoader, isGetFileNameOnly = false) => {
let source = 'None'
const handleUnstructuredFileLoader = (config: any, isGetFileNameOnly: boolean): string => {
if (config.fileObject) {
return isGetFileNameOnly ? getFileName(config.fileObject) : config.fileObject.replace('FILE-STORAGE::', '')
}
return config.filePath || 'None'
}
switch (loader.loaderId) {
case 'pdfFile':
case 'jsonFile':
case 'csvFile':
case 'file':
case 'jsonlinesFile':
case 'txtFile':
source = isGetFileNameOnly
? getFileName(loader.loaderConfig[loader.loaderId])
: loader.loaderConfig[loader.loaderId]?.replace('FILE-STORAGE::', '') || 'None'
break
case 'apiLoader':
source = loader.loaderConfig.url + ' (' + loader.loaderConfig.method + ')'
break
case 'cheerioWebScraper':
case 'playwrightWebScraper':
case 'puppeteerWebScraper':
source = loader.loaderConfig.url || 'None'
break
case 'unstructuredFileLoader':
source = handleUnstructuredFileLoader(loader.loaderConfig, isGetFileNameOnly)
break
default:
source = 'None'
break
}
return source
}
export class DocumentStoreDTO {
id: string
name: string
@@ -130,40 +227,9 @@ export class DocumentStoreDTO {
if (entity.loaders) {
documentStoreDTO.loaders = JSON.parse(entity.loaders)
documentStoreDTO.loaders.map((loader) => {
documentStoreDTO.totalChars += loader.totalChars
documentStoreDTO.totalChunks += loader.totalChunks
switch (loader.loaderId) {
case 'pdfFile':
loader.source = loader.loaderConfig.pdfFile.replace('FILE-STORAGE::', '')
break
case 'apiLoader':
loader.source = loader.loaderConfig.url + ' (' + loader.loaderConfig.method + ')'
break
case 'cheerioWebScraper':
loader.source = loader.loaderConfig.url
break
case 'playwrightWebScraper':
loader.source = loader.loaderConfig.url
break
case 'puppeteerWebScraper':
loader.source = loader.loaderConfig.url
break
case 'jsonFile':
loader.source = loader.loaderConfig.jsonFile.replace('FILE-STORAGE::', '')
break
case 'docxFile':
loader.source = loader.loaderConfig.docxFile.replace('FILE-STORAGE::', '')
break
case 'textFile':
loader.source = loader.loaderConfig.txtFile.replace('FILE-STORAGE::', '')
break
case 'unstructuredFileLoader':
loader.source = loader.loaderConfig.filePath
break
default:
loader.source = 'None'
break
}
documentStoreDTO.totalChars += loader.totalChars || 0
documentStoreDTO.totalChunks += loader.totalChunks || 0
loader.source = addLoaderSource(loader)
if (loader.status !== 'SYNC') {
documentStoreDTO.status = DocumentStoreStatus.STALE
}
@@ -4,6 +4,15 @@ import documentStoreService from '../../services/documentstore'
import { DocumentStore } from '../../database/entities/DocumentStore'
import { InternalFlowiseError } from '../../errors/internalFlowiseError'
import { DocumentStoreDTO } from '../../Interface'
import { getRateLimiter } from '../../utils/rateLimit'
const getRateLimiterMiddleware = async (req: Request, res: Response, next: NextFunction) => {
try {
return getRateLimiter(req, res, next)
} catch (error) {
next(error)
}
}
const createDocumentStore = async (req: Request, res: Response, next: NextFunction) => {
try {
@@ -160,16 +169,39 @@ const editDocumentStoreFileChunk = async (req: Request, res: Response, next: Nex
}
}
const processFileChunks = async (req: Request, res: Response, next: NextFunction) => {
const saveProcessingLoader = async (req: Request, res: Response, next: NextFunction) => {
try {
if (typeof req.body === 'undefined') {
throw new InternalFlowiseError(
StatusCodes.PRECONDITION_FAILED,
`Error: documentStoreController.processFileChunks - body not provided!`
`Error: documentStoreController.saveProcessingLoader - body not provided!`
)
}
const body = req.body
const apiResponse = await documentStoreService.processAndSaveChunks(body)
const apiResponse = await documentStoreService.saveProcessingLoader(body)
return res.json(apiResponse)
} catch (error) {
next(error)
}
}
const processLoader = async (req: Request, res: Response, next: NextFunction) => {
try {
if (typeof req.params.loaderId === 'undefined' || req.params.loaderId === '') {
throw new InternalFlowiseError(
StatusCodes.PRECONDITION_FAILED,
`Error: documentStoreController.processLoader - loaderId not provided!`
)
}
if (typeof req.body === 'undefined') {
throw new InternalFlowiseError(
StatusCodes.PRECONDITION_FAILED,
`Error: documentStoreController.processLoader - body not provided!`
)
}
const docLoaderId = req.params.loaderId
const body = req.body
const apiResponse = await documentStoreService.processLoader(body, docLoaderId)
return res.json(apiResponse)
} catch (error) {
next(error)
@@ -342,6 +374,42 @@ const getRecordManagerProviders = async (req: Request, res: Response, next: Next
}
}
const upsertDocStoreMiddleware = async (req: Request, res: Response, next: NextFunction) => {
try {
if (typeof req.params.id === 'undefined' || req.params.id === '') {
throw new InternalFlowiseError(
StatusCodes.PRECONDITION_FAILED,
`Error: documentStoreController.upsertDocStoreMiddleware - storeId not provided!`
)
}
if (typeof req.body === 'undefined') {
throw new Error('Error: documentStoreController.upsertDocStoreMiddleware - body not provided!')
}
const body = req.body
const files = (req.files as Express.Multer.File[]) || []
const apiResponse = await documentStoreService.upsertDocStoreMiddleware(req.params.id, body, files)
return res.json(apiResponse)
} catch (error) {
next(error)
}
}
const refreshDocStoreMiddleware = async (req: Request, res: Response, next: NextFunction) => {
try {
if (typeof req.params.id === 'undefined' || req.params.id === '') {
throw new InternalFlowiseError(
StatusCodes.PRECONDITION_FAILED,
`Error: documentStoreController.refreshDocStoreMiddleware - storeId not provided!`
)
}
const body = req.body
const apiResponse = await documentStoreService.refreshDocStoreMiddleware(req.params.id, body)
return res.json(apiResponse)
} catch (error) {
next(error)
}
}
export default {
deleteDocumentStore,
createDocumentStore,
@@ -350,7 +418,7 @@ export default {
getDocumentStoreById,
getDocumentStoreFileChunks,
updateDocumentStore,
processFileChunks,
processLoader,
previewFileChunks,
getDocumentLoaders,
deleteDocumentStoreFileChunk,
@@ -362,5 +430,9 @@ export default {
saveVectorStoreConfig,
queryVectorStore,
deleteVectorStoreFromStore,
updateVectorStoreConfigOnly
updateVectorStoreConfigOnly,
getRateLimiterMiddleware,
upsertDocStoreMiddleware,
refreshDocStoreMiddleware,
saveProcessingLoader
}
@@ -1,6 +1,14 @@
import express from 'express'
import documentStoreController from '../../controllers/documentstore'
import multer from 'multer'
import path from 'path'
const router = express.Router()
const upload = multer({ dest: `${path.join(__dirname, '..', '..', '..', 'uploads')}/` })
router.post(['/upsert/', '/upsert/:id'], upload.array('files'), documentStoreController.upsertDocStoreMiddleware)
router.post(['/refresh/', '/refresh/:id'], documentStoreController.refreshDocStoreMiddleware)
/** Document Store Routes */
// Create document store
@@ -22,8 +30,10 @@ router.get('/components/loaders', documentStoreController.getDocumentLoaders)
router.delete('/loader/:id/:loaderId', documentStoreController.deleteLoaderFromDocumentStore)
// chunking preview
router.post('/loader/preview', documentStoreController.previewFileChunks)
// saving process
router.post('/loader/save', documentStoreController.saveProcessingLoader)
// chunking process
router.post('/loader/process', documentStoreController.processFileChunks)
router.post('/loader/process/:loaderId', documentStoreController.processLoader)
/** Document Store - Loaders - Chunks */
// delete specific file chunk from the store
@@ -1,20 +1,28 @@
import { getRunningExpressApp } from '../../utils/getRunningExpressApp'
import { DocumentStore } from '../../database/entities/DocumentStore'
import * as fs from 'fs'
import * as path from 'path'
import {
addArrayFilesToStorage,
addSingleFileToStorage,
getFileFromStorage,
ICommonObject,
IDocument,
mapExtToInputField,
mapMimeTypeToInputField,
removeFilesFromStorage,
removeSpecificFileFromStorage
} from 'flowise-components'
import {
addLoaderSource,
ChatType,
DocumentStoreStatus,
IDocumentStoreFileChunkPagedResponse,
IDocumentStoreLoader,
IDocumentStoreLoaderFile,
IDocumentStoreLoaderForPreview,
IDocumentStoreRefreshData,
IDocumentStoreUpsertData,
IDocumentStoreWhereUsed,
INodeData
} from '../../Interface'
@@ -75,7 +83,7 @@ const getAllDocumentFileChunks = async () => {
}
}
const deleteLoaderFromDocumentStore = async (storeId: string, loaderId: string) => {
const deleteLoaderFromDocumentStore = async (storeId: string, docId: string) => {
try {
const appServer = getRunningExpressApp()
const entity = await appServer.AppDataSource.getRepository(DocumentStore).findOneBy({
@@ -88,12 +96,16 @@ const deleteLoaderFromDocumentStore = async (storeId: string, loaderId: string)
)
}
const existingLoaders = JSON.parse(entity.loaders)
const found = existingLoaders.find((uFile: IDocumentStoreLoader) => uFile.id === loaderId)
const found = existingLoaders.find((loader: IDocumentStoreLoader) => loader.id === docId)
if (found) {
if (found.files?.length) {
for (const file of found.files) {
if (file.name) {
await removeSpecificFileFromStorage(DOCUMENT_STORE_BASE_FOLDER, storeId, file.name)
try {
await removeSpecificFileFromStorage(DOCUMENT_STORE_BASE_FOLDER, storeId, file.name)
} catch (error) {
console.error(error)
}
}
}
}
@@ -169,7 +181,7 @@ const getUsedChatflowNames = async (entity: DocumentStore) => {
}
// Get chunks for a specific loader or store
const getDocumentStoreFileChunks = async (storeId: string, fileId: string, pageNo: number = 1) => {
const getDocumentStoreFileChunks = async (storeId: string, docId: string, pageNo: number = 1) => {
try {
const appServer = getRunningExpressApp()
const entity = await appServer.AppDataSource.getRepository(DocumentStore).findOneBy({
@@ -184,29 +196,34 @@ const getDocumentStoreFileChunks = async (storeId: string, fileId: string, pageN
const loaders = JSON.parse(entity.loaders)
let found: IDocumentStoreLoader | undefined
if (fileId !== 'all') {
found = loaders.find((loader: IDocumentStoreLoader) => loader.id === fileId)
if (docId !== 'all') {
found = loaders.find((loader: IDocumentStoreLoader) => loader.id === docId)
if (!found) {
throw new InternalFlowiseError(
StatusCodes.NOT_FOUND,
`Error: documentStoreServices.getDocumentStoreById - Document file ${fileId} not found`
`Error: documentStoreServices.getDocumentStoreById - Document loader ${docId} not found`
)
}
}
let totalChars = 0
loaders.forEach((loader: IDocumentStoreLoader) => {
totalChars += loader.totalChars
})
if (found) {
found.totalChars = totalChars
found.id = fileId
found.id = docId
found.status = entity.status
}
let characters = 0
if (docId === 'all') {
loaders.forEach((loader: IDocumentStoreLoader) => {
characters += loader.totalChars || 0
})
} else {
characters = found?.totalChars || 0
}
const PAGE_SIZE = 50
const skip = (pageNo - 1) * PAGE_SIZE
const take = PAGE_SIZE
let whereCondition: any = { docId: fileId }
if (fileId === 'all') {
let whereCondition: any = { docId: docId }
if (docId === 'all') {
whereCondition = { storeId: storeId }
}
const count = await appServer.AppDataSource.getRepository(DocumentStoreFileChunk).count({
@@ -222,7 +239,7 @@ const getDocumentStoreFileChunks = async (storeId: string, fileId: string, pageN
})
if (!chunksWithCount) {
throw new InternalFlowiseError(StatusCodes.NOT_FOUND, `File ${fileId} not found`)
throw new InternalFlowiseError(StatusCodes.NOT_FOUND, `Chunks with docId: ${docId} not found`)
}
const response: IDocumentStoreFileChunkPagedResponse = {
@@ -231,7 +248,9 @@ const getDocumentStoreFileChunks = async (storeId: string, fileId: string, pageN
file: found,
currentPage: pageNo,
storeName: entity.name,
description: entity.description
description: entity.description,
docId: docId,
characters
}
return response
} catch (error) {
@@ -465,7 +484,7 @@ const _splitIntoChunks = async (data: IDocumentStoreLoaderForPreview) => {
try {
const appServer = getRunningExpressApp()
let splitterInstance = null
if (data.splitterConfig && Object.keys(data.splitterConfig).length > 0) {
if (data.splitterId && data.splitterConfig && Object.keys(data.splitterConfig).length > 0) {
const nodeInstanceFilePath = appServer.nodesPool.componentNodes[data.splitterId].filePath as string
const nodeModule = await import(nodeInstanceFilePath)
const newNodeInstance = new nodeModule.nodeClass()
@@ -475,11 +494,12 @@ const _splitIntoChunks = async (data: IDocumentStoreLoaderForPreview) => {
}
splitterInstance = await newNodeInstance.init(nodeData)
}
if (!data.loaderId) return []
const nodeInstanceFilePath = appServer.nodesPool.componentNodes[data.loaderId].filePath as string
const nodeModule = await import(nodeInstanceFilePath)
// doc loader configs
const nodeData = {
credential: data.credential || undefined,
credential: data.credential || data.loaderConfig['FLOWISE_CREDENTIAL_ID'] || undefined,
inputs: { ...data.loaderConfig, textSplitter: splitterInstance },
outputs: { output: 'document' }
}
@@ -568,9 +588,9 @@ const previewChunks = async (data: IDocumentStoreLoaderForPreview) => {
// if -1, return all chunks
if (data.previewChunkCount === -1) data.previewChunkCount = totalChunks
// return all docs if the user ask for more than we have
if (totalChunks <= data.previewChunkCount) data.previewChunkCount = totalChunks
if (totalChunks <= (data.previewChunkCount || 0)) data.previewChunkCount = totalChunks
// return only the first n chunks
if (totalChunks > data.previewChunkCount) docs = docs.slice(0, data.previewChunkCount)
if (totalChunks > (data.previewChunkCount || 0)) docs = docs.slice(0, data.previewChunkCount)
return { chunks: docs, totalChunks: totalChunks, previewChunkCount: data.previewChunkCount }
} catch (error) {
@@ -581,7 +601,7 @@ const previewChunks = async (data: IDocumentStoreLoaderForPreview) => {
}
}
const processAndSaveChunks = async (data: IDocumentStoreLoaderForPreview) => {
const saveProcessingLoader = async (data: IDocumentStoreLoaderForPreview): Promise<IDocumentStoreLoader> => {
try {
const appServer = getRunningExpressApp()
const entity = await appServer.AppDataSource.getRepository(DocumentStore).findOneBy({
@@ -590,14 +610,14 @@ const processAndSaveChunks = async (data: IDocumentStoreLoaderForPreview) => {
if (!entity) {
throw new InternalFlowiseError(
StatusCodes.NOT_FOUND,
`Error: documentStoreServices.processAndSaveChunks - Document store ${data.storeId} not found`
`Error: documentStoreServices.saveProcessingLoader - Document store ${data.storeId} not found`
)
}
const existingLoaders = JSON.parse(entity.loaders)
const newLoaderId = data.id ?? uuidv4()
const found = existingLoaders.find((ldr: IDocumentStoreLoader) => ldr.id === newLoaderId)
const newDocLoaderId = data.id ?? uuidv4()
const found = existingLoaders.find((ldr: IDocumentStoreLoader) => ldr.id === newDocLoaderId)
if (found) {
const foundIndex = existingLoaders.findIndex((ldr: IDocumentStoreLoader) => ldr.id === newLoaderId)
const foundIndex = existingLoaders.findIndex((ldr: IDocumentStoreLoader) => ldr.id === newDocLoaderId)
if (!data.loaderId) data.loaderId = found.loaderId
if (!data.loaderName) data.loaderName = found.loaderName
@@ -629,7 +649,7 @@ const processAndSaveChunks = async (data: IDocumentStoreLoaderForPreview) => {
entity.loaders = JSON.stringify(existingLoaders)
} else {
let loader: IDocumentStoreLoader = {
id: newLoaderId,
id: newDocLoaderId,
loaderId: data.loaderId,
loaderName: data.loaderName,
loaderConfig: data.loaderConfig,
@@ -647,13 +667,40 @@ const processAndSaveChunks = async (data: IDocumentStoreLoaderForPreview) => {
entity.loaders = JSON.stringify(existingLoaders)
}
await appServer.AppDataSource.getRepository(DocumentStore).save(entity)
// this method will run async, will have to be moved to a worker thread
_saveChunksToStorage(data, entity, newLoaderId).then(() => {})
return getDocumentStoreFileChunks(data.storeId as string, newLoaderId)
const newLoaders = JSON.parse(entity.loaders)
const newLoader = newLoaders.find((ldr: IDocumentStoreLoader) => ldr.id === newDocLoaderId)
if (!newLoader) {
throw new Error(`Loader ${newDocLoaderId} not found`)
}
newLoader.source = addLoaderSource(newLoader, true)
return newLoader
} catch (error) {
throw new InternalFlowiseError(
StatusCodes.INTERNAL_SERVER_ERROR,
`Error: documentStoreServices.processAndSaveChunks - ${getErrorMessage(error)}`
`Error: documentStoreServices.saveProcessingLoader - ${getErrorMessage(error)}`
)
}
}
const processLoader = async (data: IDocumentStoreLoaderForPreview, docLoaderId: string) => {
try {
const appServer = getRunningExpressApp()
const entity = await appServer.AppDataSource.getRepository(DocumentStore).findOneBy({
id: data.storeId
})
if (!entity) {
throw new InternalFlowiseError(
StatusCodes.NOT_FOUND,
`Error: documentStoreServices.processLoader - Document store ${data.storeId} not found`
)
}
// this method will run async, will have to be moved to a worker thread
await _saveChunksToStorage(data, entity, docLoaderId)
return getDocumentStoreFileChunks(data.storeId as string, docLoaderId)
} catch (error) {
throw new InternalFlowiseError(
StatusCodes.INTERNAL_SERVER_ERROR,
`Error: documentStoreServices.processLoader - ${getErrorMessage(error)}`
)
}
}
@@ -665,100 +712,111 @@ const _saveChunksToStorage = async (data: IDocumentStoreLoaderForPreview, entity
const appServer = getRunningExpressApp()
//step 1: restore the full paths, if any
await _normalizeFilePaths(data, entity)
//step 2: split the file into chunks
previewChunks(data).then(async (response) => {
//step 3: remove all files associated with the loader
const existingLoaders = JSON.parse(entity.loaders)
const loader = existingLoaders.find((ldr: IDocumentStoreLoader) => ldr.id === newLoaderId)
if (data.id) {
const index = existingLoaders.indexOf(loader)
if (index > -1) {
existingLoaders.splice(index, 1)
if (!data.rehydrated) {
if (loader.files) {
loader.files.map(async (file: IDocumentStoreLoaderFile) => {
const response = await previewChunks(data)
//step 3: remove all files associated with the loader
const existingLoaders = JSON.parse(entity.loaders)
const loader = existingLoaders.find((ldr: IDocumentStoreLoader) => ldr.id === newLoaderId)
if (data.id) {
const index = existingLoaders.indexOf(loader)
if (index > -1) {
existingLoaders.splice(index, 1)
if (!data.rehydrated) {
if (loader.files) {
loader.files.map(async (file: IDocumentStoreLoaderFile) => {
try {
await removeSpecificFileFromStorage(DOCUMENT_STORE_BASE_FOLDER, entity.id, file.name)
})
}
} catch (error) {
console.error(error)
}
})
}
}
}
//step 4: save new file to storage
let filesWithMetadata = []
const keys = Object.getOwnPropertyNames(data.loaderConfig)
for (let i = 0; i < keys.length; i++) {
const input = data.loaderConfig[keys[i]]
if (!input) {
continue
}
if (typeof input !== 'string') {
continue
}
if (input.startsWith('[') && input.endsWith(']')) {
const files = JSON.parse(input)
const fileNames: string[] = []
for (let j = 0; j < files.length; j++) {
const file = files[j]
if (re.test(file)) {
const fileMetadata = await _saveFileToStorage(file, entity)
fileNames.push(fileMetadata.name)
filesWithMetadata.push(fileMetadata)
}
}
//step 4: save new file to storage
let filesWithMetadata = []
const keys = Object.getOwnPropertyNames(data.loaderConfig)
for (let i = 0; i < keys.length; i++) {
const input = data.loaderConfig[keys[i]]
if (!input) {
continue
}
if (typeof input !== 'string') {
continue
}
if (input.startsWith('[') && input.endsWith(']')) {
const files = JSON.parse(input)
const fileNames: string[] = []
for (let j = 0; j < files.length; j++) {
const file = files[j]
if (re.test(file)) {
const fileMetadata = await _saveFileToStorage(file, entity)
fileNames.push(fileMetadata.name)
filesWithMetadata.push(fileMetadata)
}
data.loaderConfig[keys[i]] = 'FILE-STORAGE::' + JSON.stringify(fileNames)
} else if (re.test(input)) {
const fileNames: string[] = []
const fileMetadata = await _saveFileToStorage(input, entity)
fileNames.push(fileMetadata.name)
filesWithMetadata.push(fileMetadata)
data.loaderConfig[keys[i]] = 'FILE-STORAGE::' + JSON.stringify(fileNames)
break
}
data.loaderConfig[keys[i]] = 'FILE-STORAGE::' + JSON.stringify(fileNames)
} else if (re.test(input)) {
const fileNames: string[] = []
const fileMetadata = await _saveFileToStorage(input, entity)
fileNames.push(fileMetadata.name)
filesWithMetadata.push(fileMetadata)
data.loaderConfig[keys[i]] = 'FILE-STORAGE::' + JSON.stringify(fileNames)
break
}
//step 5: update with the new files and loaderConfig
if (filesWithMetadata.length > 0) {
loader.loaderConfig = data.loaderConfig
loader.files = filesWithMetadata
}
//step 6: update the loaders with the new loaderConfig
if (data.id) {
existingLoaders.push(loader)
}
//step 7: remove all previous chunks
await appServer.AppDataSource.getRepository(DocumentStoreFileChunk).delete({ docId: newLoaderId })
if (response.chunks) {
//step 8: now save the new chunks
const totalChars = response.chunks.reduce((acc, chunk) => {
if (chunk.pageContent) {
return acc + chunk.pageContent.length
}
return acc
}, 0)
response.chunks.map(async (chunk: IDocument, index: number) => {
const docChunk: DocumentStoreFileChunk = {
docId: newLoaderId,
storeId: data.storeId || '',
id: uuidv4(),
chunkNo: index + 1,
pageContent: chunk.pageContent,
metadata: JSON.stringify(chunk.metadata)
}
const dChunk = appServer.AppDataSource.getRepository(DocumentStoreFileChunk).create(docChunk)
await appServer.AppDataSource.getRepository(DocumentStoreFileChunk).save(dChunk)
})
// update the loader with the new metrics
loader.totalChunks = response.totalChunks
loader.totalChars = totalChars
}
loader.status = 'SYNC'
// have a flag and iterate over the loaders and update the entity status to SYNC
const allSynced = existingLoaders.every((ldr: IDocumentStoreLoader) => ldr.status === 'SYNC')
entity.status = allSynced ? DocumentStoreStatus.SYNC : DocumentStoreStatus.STALE
entity.loaders = JSON.stringify(existingLoaders)
//step 9: update the entity in the database
await appServer.AppDataSource.getRepository(DocumentStore).save(entity)
return
})
}
//step 5: update with the new files and loaderConfig
if (filesWithMetadata.length > 0) {
loader.loaderConfig = data.loaderConfig
loader.files = filesWithMetadata
}
//step 6: update the loaders with the new loaderConfig
if (data.id) {
existingLoaders.push(loader)
}
//step 7: remove all previous chunks
await appServer.AppDataSource.getRepository(DocumentStoreFileChunk).delete({ docId: newLoaderId })
if (response.chunks) {
//step 8: now save the new chunks
const totalChars = response.chunks.reduce((acc, chunk) => {
if (chunk.pageContent) {
return acc + chunk.pageContent.length
}
return acc
}, 0)
response.chunks.map(async (chunk: IDocument, index: number) => {
const docChunk: DocumentStoreFileChunk = {
docId: newLoaderId,
storeId: data.storeId || '',
id: uuidv4(),
chunkNo: index + 1,
pageContent: chunk.pageContent,
metadata: JSON.stringify(chunk.metadata)
}
const dChunk = appServer.AppDataSource.getRepository(DocumentStoreFileChunk).create(docChunk)
await appServer.AppDataSource.getRepository(DocumentStoreFileChunk).save(dChunk)
})
// update the loader with the new metrics
loader.totalChunks = response.totalChunks
loader.totalChars = totalChars
}
loader.status = 'SYNC'
// have a flag and iterate over the loaders and update the entity status to SYNC
const allSynced = existingLoaders.every((ldr: IDocumentStoreLoader) => ldr.status === 'SYNC')
entity.status = allSynced ? DocumentStoreStatus.SYNC : DocumentStoreStatus.STALE
entity.loaders = JSON.stringify(existingLoaders)
//step 9: update the entity in the database
await appServer.AppDataSource.getRepository(DocumentStore).save(entity)
return
} catch (error) {
throw new InternalFlowiseError(
StatusCodes.INTERNAL_SERVER_ERROR,
@@ -960,11 +1018,16 @@ const _insertIntoVectorStoreWorkerThread = async (data: ICommonObject) => {
// Get Vector Store Node Data
const vStoreNodeData = _createVectorStoreNodeData(appServer, data, embeddingObj, recordManagerObj)
// Prepare docs for upserting
const filterOptions: ICommonObject = {
storeId: data.storeId
}
if (data.docId) {
filterOptions['docId'] = data.docId
}
const chunks = await appServer.AppDataSource.getRepository(DocumentStoreFileChunk).find({
where: {
storeId: data.storeId
}
where: filterOptions
})
const docs: Document[] = chunks.map((chunk: DocumentStoreFileChunk) => {
return new Document({
@@ -1248,6 +1311,263 @@ const _createVectorStoreObject = async (
return vStoreNodeInstance
}
const upsertDocStoreMiddleware = async (
storeId: string,
data: IDocumentStoreUpsertData,
files: Express.Multer.File[] = [],
isRefreshExisting = false
) => {
const appServer = getRunningExpressApp()
const docId = data.docId
const newLoader = typeof data.loader === 'string' ? JSON.parse(data.loader) : data.loader
const newSplitter = typeof data.splitter === 'string' ? JSON.parse(data.splitter) : data.splitter
const newVectorStore = typeof data.vectorStore === 'string' ? JSON.parse(data.vectorStore) : data.vectorStore
const newEmbedding = typeof data.embedding === 'string' ? JSON.parse(data.embedding) : data.embedding
const newRecordManager = typeof data.recordManager === 'string' ? JSON.parse(data.recordManager) : data.recordManager
const getComponentLabelFromName = (nodeName: string) => {
const component = Object.values(appServer.nodesPool.componentNodes).find((node) => node.name === nodeName)
return component?.label || ''
}
let loaderName = ''
let loaderId = ''
let loaderConfig: ICommonObject = {}
let splitterName = ''
let splitterId = ''
let splitterConfig: ICommonObject = {}
let vectorStoreName = ''
let vectorStoreConfig: ICommonObject = {}
let embeddingName = ''
let embeddingConfig: ICommonObject = {}
let recordManagerName = ''
let recordManagerConfig: ICommonObject = {}
// Step 1: Get existing loader
if (docId) {
const entity = await appServer.AppDataSource.getRepository(DocumentStore).findOneBy({ id: storeId })
if (!entity) {
throw new InternalFlowiseError(StatusCodes.NOT_FOUND, `Document store ${storeId} not found`)
}
const loaders = JSON.parse(entity.loaders)
const loader = loaders.find((ldr: IDocumentStoreLoader) => ldr.id === docId)
if (!loader) {
throw new InternalFlowiseError(StatusCodes.NOT_FOUND, `Document loader ${docId} not found`)
}
// Loader
loaderName = loader.loaderName
loaderId = loader.loaderId
loaderConfig = {
...loaderConfig,
...loader?.loaderConfig
}
// Splitter
splitterName = loader.splitterName
splitterId = loader.splitterId
splitterConfig = {
...splitterConfig,
...loader?.splitterConfig
}
// Vector Store
vectorStoreName = JSON.parse(entity.vectorStoreConfig || '{}')?.name
vectorStoreConfig = JSON.parse(entity.vectorStoreConfig || '{}')?.config
// Embedding
embeddingName = JSON.parse(entity.embeddingConfig || '{}')?.name
embeddingConfig = JSON.parse(entity.embeddingConfig || '{}')?.config
// Record Manager
recordManagerName = JSON.parse(entity.recordManagerConfig || '{}')?.name
recordManagerConfig = JSON.parse(entity.recordManagerConfig || '{}')?.config
}
// Step 2: Replace with new values
loaderName = newLoader?.name ? getComponentLabelFromName(newLoader?.name) : loaderName
loaderId = newLoader?.name || loaderId
loaderConfig = {
...loaderConfig,
...newLoader?.config
}
splitterName = newSplitter?.name ? getComponentLabelFromName(newSplitter?.name) : splitterName
splitterId = newSplitter?.name || splitterId
splitterConfig = {
...splitterConfig,
...newSplitter?.config
}
vectorStoreName = newVectorStore?.name || vectorStoreName
vectorStoreConfig = {
...vectorStoreConfig,
...newVectorStore?.config
}
embeddingName = newEmbedding?.name || embeddingName
embeddingConfig = {
...embeddingConfig,
...newEmbedding?.config
}
recordManagerName = newRecordManager?.name || recordManagerName
recordManagerConfig = {
recordManagerConfig,
...newRecordManager?.config
}
// Step 3: Replace with files
if (files.length) {
const filesLoaderConfig: ICommonObject = {}
for (const file of files) {
const fileNames: string[] = []
const fileBuffer = fs.readFileSync(file.path)
// Address file name with special characters: https://github.com/expressjs/multer/issues/1104
file.originalname = Buffer.from(file.originalname, 'latin1').toString('utf8')
try {
await addArrayFilesToStorage(file.mimetype, fileBuffer, file.originalname, fileNames, DOCUMENT_STORE_BASE_FOLDER, storeId)
} catch (error) {
continue
}
const mimePrefix = 'data:' + file.mimetype + ';base64'
const storagePath = mimePrefix + ',' + fileBuffer.toString('base64') + `,filename:${file.originalname}`
const fileInputFieldFromMimeType = mapMimeTypeToInputField(file.mimetype)
const fileExtension = path.extname(file.originalname)
const fileInputFieldFromExt = mapExtToInputField(fileExtension)
let fileInputField = 'txtFile'
if (fileInputFieldFromExt !== 'txtFile') {
fileInputField = fileInputFieldFromExt
} else if (fileInputFieldFromMimeType !== 'txtFile') {
fileInputField = fileInputFieldFromExt
}
if (loaderId === 'unstructuredFileLoader') {
fileInputField = 'fileObject'
}
if (filesLoaderConfig[fileInputField]) {
const existingFileInputFieldArray = JSON.parse(filesLoaderConfig[fileInputField])
const newFileInputFieldArray = [storagePath]
const updatedFieldArray = existingFileInputFieldArray.concat(newFileInputFieldArray)
filesLoaderConfig[fileInputField] = JSON.stringify(updatedFieldArray)
} else {
filesLoaderConfig[fileInputField] = JSON.stringify([storagePath])
}
fs.unlinkSync(file.path)
}
loaderConfig = {
...loaderConfig,
...filesLoaderConfig
}
}
// Step 4: Verification for must have components
if (!loaderName || !loaderId || !loaderConfig) {
throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, `Loader not configured`)
}
if (!vectorStoreName || !vectorStoreConfig) {
throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, `Vector store not configured`)
}
if (!embeddingName || !embeddingConfig) {
throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, `Embedding not configured`)
}
// Step 5: Process & Upsert
const processData: IDocumentStoreLoaderForPreview = {
storeId,
loaderId,
loaderName,
loaderConfig,
splitterId,
splitterName,
splitterConfig
}
if (isRefreshExisting) {
processData.id = docId
}
try {
const newLoader = await saveProcessingLoader(processData)
const result = await processLoader(processData, newLoader.id || '')
const newDocId = result.docId
const insertData = {
storeId,
docId: newDocId,
vectorStoreName,
vectorStoreConfig,
embeddingName,
embeddingConfig,
recordManagerName,
recordManagerConfig
}
const res = await insertIntoVectorStore(insertData)
res.docId = newDocId
return res
} catch (error) {
throw new InternalFlowiseError(
StatusCodes.INTERNAL_SERVER_ERROR,
`Error: documentStoreServices.upsertDocStoreMiddleware - ${getErrorMessage(error)}`
)
}
}
const refreshDocStoreMiddleware = async (storeId: string, data?: IDocumentStoreRefreshData) => {
const appServer = getRunningExpressApp()
try {
const results = []
let totalItems: IDocumentStoreUpsertData[] = []
if (!data || !data.items || data.items.length === 0) {
const entity = await appServer.AppDataSource.getRepository(DocumentStore).findOneBy({ id: storeId })
if (!entity) {
throw new InternalFlowiseError(StatusCodes.NOT_FOUND, `Document store ${storeId} not found`)
}
const loaders = JSON.parse(entity.loaders)
totalItems = loaders.map((ldr: IDocumentStoreLoader) => {
return {
docId: ldr.id
}
})
} else {
totalItems = data.items
}
for (const item of totalItems) {
const res = await upsertDocStoreMiddleware(storeId, item, [], true)
results.push(res)
}
return results
} catch (error) {
throw new InternalFlowiseError(
StatusCodes.INTERNAL_SERVER_ERROR,
`Error: documentStoreServices.refreshDocStoreMiddleware - ${getErrorMessage(error)}`
)
}
}
export default {
updateDocumentStoreUsage,
deleteDocumentStore,
@@ -1260,7 +1580,8 @@ export default {
getDocumentStoreFileChunks,
updateDocumentStore,
previewChunks,
processAndSaveChunks,
saveProcessingLoader,
processLoader,
deleteDocumentStoreFileChunk,
editDocumentStoreFileChunk,
getDocumentLoaders,
@@ -1271,5 +1592,7 @@ export default {
saveVectorStoreConfig,
queryVectorStore,
deleteVectorStoreFromStore,
updateVectorStoreConfigOnly
updateVectorStoreConfigOnly,
upsertDocStoreMiddleware,
refreshDocStoreMiddleware
}
+12 -3
View File
@@ -39,6 +39,7 @@ export const createFileAttachment = async (req: Request) => {
const files = (req.files as Express.Multer.File[]) || []
const fileAttachments = []
if (files.length) {
const isBase64 = req.body.base64
for (const file of files) {
const fileBuffer = fs.readFileSync(file.path)
const fileNames: string[] = []
@@ -70,13 +71,21 @@ export const createFileAttachment = async (req: Request) => {
[fileInputField]: storagePath
}
}
const documents: IDocument[] = await fileLoaderNodeInstance.init(nodeData, '', options)
const pageContents = documents.map((doc) => doc.pageContent).join('\n')
let content = ''
if (isBase64) {
content = fileBuffer.toString('base64')
} else {
const documents: IDocument[] = await fileLoaderNodeInstance.init(nodeData, '', options)
content = documents.map((doc) => doc.pageContent).join('\n')
}
fileAttachments.push({
name: file.originalname,
mimeType: file.mimetype,
size: file.size,
content: pageContents
content
})
} catch (error) {
throw new Error(`Failed operation: createFileAttachment - ${getErrorMessage(error)}`)