Feature/DocumentStore (#2106)

* datasource: initial commit

* datasource: datasource details and chunks

* datasource: Document Store Node

* more changes

* Document Store - Base functionality

* Document Store Loader Component

* Document Store Loader Component

* before merging the modularity PR

* after merging the modularity PR

* preview mode

* initial draft PR

* fixes

* minor updates and  fixes

* preview with loader and splitter

* preview with credential

* show stored chunks

* preview update...

* edit config

* save, preview and other changes

* save, preview and other changes

* save, process and other changes

* save, process and other changes

* alpha1 - for internal testing

* rerouting urls

* bug fix on new leader create

* pagination support for chunks

* delete document store

* Update pnpm-lock.yaml

* doc store card view

* Update store files to use updated storage functions, Document Store Table View and other changes

* ui changes

* add expanded chunk dialog, improve ui

* change throw Error to InternalError

* Bug Fixes and removal of subFolder, adding of view chunks for store

* lint fixes

* merge changes

* DocumentStoreStatus component

* ui changes for doc store

* add remove metadata key field, add custom document loader

* add chatflows used doc store chips

* add types/interfaces to DocumentStore Services

* document loader list dialog title bar color change

* update interfaces

* Whereused Chatflow Name and Added chunkNo to retain order of created chunks.

* use typeorm order chunkNo, ui changes

---------

Co-authored-by: Henry <hzj94@hotmail.com>
Co-authored-by: Henry Heng <henryheng@flowiseai.com>
This commit is contained in:
Vinod Kiran
2024-05-06 19:53:27 +05:30
committed by GitHub
parent af4e28aa91
commit 40e36d1b39
91 changed files with 38713 additions and 32791 deletions
@@ -13,6 +13,7 @@ import { ChatMessageFeedback } from '../../database/entities/ChatMessageFeedback
import { UpsertHistory } from '../../database/entities/UpsertHistory'
import { containsBase64File, updateFlowDataWithFilePaths } from '../../utils/fileRepository'
import { getErrorMessage } from '../../errors/utils'
import documentStoreService from '../../services/documentstore'
// Check if chatflow valid for streaming
const checkIfChatflowIsValidForStreaming = async (chatflowId: string): Promise<any> => {
@@ -76,6 +77,7 @@ const deleteChatflow = async (chatflowId: string): Promise<any> => {
try {
// Delete all uploads corresponding to this chatflow
await removeFolderFromStorage(chatflowId)
await documentStoreService.updateDocumentStoreUsage(chatflowId, undefined)
// Delete all chat messages
await appServer.AppDataSource.getRepository(ChatMessage).delete({ chatflowid: chatflowId })
@@ -166,6 +168,7 @@ const saveChatflow = async (newChatFlow: ChatFlow): Promise<any> => {
// step 2 - convert base64 to file paths and update the chatflow
step1Results.flowData = await updateFlowDataWithFilePaths(step1Results.id, incomingFlowData)
await _checkAndUpdateDocumentStoreUsage(step1Results)
dbResponse = await appServer.AppDataSource.getRepository(ChatFlow).save(step1Results)
} else {
const chatflow = appServer.AppDataSource.getRepository(ChatFlow).create(newChatFlow)
@@ -192,6 +195,7 @@ const updateChatflow = async (chatflow: ChatFlow, updateChatFlow: ChatFlow): Pro
updateChatFlow.flowData = await updateFlowDataWithFilePaths(chatflow.id, updateChatFlow.flowData)
}
const newDbChatflow = appServer.AppDataSource.getRepository(ChatFlow).merge(chatflow, updateChatFlow)
await _checkAndUpdateDocumentStoreUsage(newDbChatflow)
const dbResponse = await appServer.AppDataSource.getRepository(ChatFlow).save(newDbChatflow)
// chatFlowPool is initialized only when a flow is opened
@@ -261,6 +265,18 @@ const getSinglePublicChatbotConfig = async (chatflowId: string): Promise<any> =>
}
}
const _checkAndUpdateDocumentStoreUsage = async (chatflow: ChatFlow) => {
const parsedFlowData: IReactFlowObject = JSON.parse(chatflow.flowData)
const nodes = parsedFlowData.nodes
// from the nodes array find if there is a node with name == documentStore)
const node = nodes.length > 0 && nodes.find((node) => node.data.name === 'documentStore')
if (!node || !node.data || !node.data.inputs || node.data.inputs['selectedStore'] === undefined) {
await documentStoreService.updateDocumentStoreUsage(chatflow.id, undefined)
} else {
await documentStoreService.updateDocumentStoreUsage(chatflow.id, node.data.inputs['selectedStore'])
}
}
export default {
checkIfChatflowIsValidForStreaming,
checkIfChatflowIsValidForUploads,
@@ -0,0 +1,710 @@
import { getRunningExpressApp } from '../../utils/getRunningExpressApp'
import { DocumentStore } from '../../database/entities/DocumentStore'
// @ts-ignore
import {
addFileToStorage,
getFileFromStorage,
ICommonObject,
IDocument,
removeFilesFromStorage,
removeSpecificFileFromStorage
} from 'flowise-components'
import {
DocumentStoreStatus,
IDocumentStoreFileChunkPagedResponse,
IDocumentStoreLoader,
IDocumentStoreLoaderFile,
IDocumentStoreLoaderForPreview,
IDocumentStoreWhereUsed
} from '../../Interface'
import { DocumentStoreFileChunk } from '../../database/entities/DocumentStoreFileChunk'
import { v4 as uuidv4 } from 'uuid'
import { databaseEntities } from '../../utils'
import logger from '../../utils/logger'
import nodesService from '../nodes'
import { InternalFlowiseError } from '../../errors/internalFlowiseError'
import { StatusCodes } from 'http-status-codes'
import { getErrorMessage } from '../../errors/utils'
import { ChatFlow } from '../../database/entities/ChatFlow'
const DOCUMENT_STORE_BASE_FOLDER = 'docustore'
const createDocumentStore = async (newDocumentStore: DocumentStore) => {
try {
const appServer = getRunningExpressApp()
const documentStore = appServer.AppDataSource.getRepository(DocumentStore).create(newDocumentStore)
const dbResponse = await appServer.AppDataSource.getRepository(DocumentStore).save(documentStore)
return dbResponse
} catch (error) {
throw new InternalFlowiseError(
StatusCodes.INTERNAL_SERVER_ERROR,
`Error: documentStoreServices.createDocumentStore - ${getErrorMessage(error)}`
)
}
}
const getAllDocumentStores = async () => {
try {
const appServer = getRunningExpressApp()
const entities = await appServer.AppDataSource.getRepository(DocumentStore).find()
return entities
} catch (error) {
throw new InternalFlowiseError(
StatusCodes.INTERNAL_SERVER_ERROR,
`Error: documentStoreServices.getAllDocumentStores - ${getErrorMessage(error)}`
)
}
}
const deleteLoaderFromDocumentStore = async (storeId: string, loaderId: string) => {
try {
const appServer = getRunningExpressApp()
const entity = await appServer.AppDataSource.getRepository(DocumentStore).findOneBy({
id: storeId
})
if (!entity) {
throw new InternalFlowiseError(
StatusCodes.NOT_FOUND,
`Error: documentStoreServices.deleteLoaderFromDocumentStore - Document store ${storeId} not found`
)
}
const existingLoaders = JSON.parse(entity.loaders)
const found = existingLoaders.find((uFile: IDocumentStoreLoader) => uFile.id === loaderId)
if (found) {
if (found.path) {
//remove the existing files, if any of the file loaders were used.
await removeSpecificFileFromStorage(DOCUMENT_STORE_BASE_FOLDER, entity.id, found.path)
}
const index = existingLoaders.indexOf(found)
if (index > -1) {
existingLoaders.splice(index, 1)
}
// remove the chunks
await appServer.AppDataSource.getRepository(DocumentStoreFileChunk).delete({ docId: found.id })
entity.loaders = JSON.stringify(existingLoaders)
const results = await appServer.AppDataSource.getRepository(DocumentStore).save(entity)
return results
} else {
throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, `Unable to locate loader in Document Store ${entity.name}`)
}
} catch (error) {
throw new InternalFlowiseError(
StatusCodes.INTERNAL_SERVER_ERROR,
`Error: documentStoreServices.deleteLoaderFromDocumentStore - ${getErrorMessage(error)}`
)
}
}
const getDocumentStoreById = async (storeId: string) => {
try {
const appServer = getRunningExpressApp()
const entity = await appServer.AppDataSource.getRepository(DocumentStore).findOneBy({
id: storeId
})
if (!entity) {
throw new InternalFlowiseError(
StatusCodes.NOT_FOUND,
`Error: documentStoreServices.getDocumentStoreById - Document store ${storeId} not found`
)
}
return entity
} catch (error) {
throw new InternalFlowiseError(
StatusCodes.INTERNAL_SERVER_ERROR,
`Error: documentStoreServices.getDocumentStoreById - ${getErrorMessage(error)}`
)
}
}
const getUsedChatflowNames = async (entity: DocumentStore) => {
try {
const appServer = getRunningExpressApp()
if (entity.whereUsed) {
const whereUsed = JSON.parse(entity.whereUsed)
const updatedWhereUsed: IDocumentStoreWhereUsed[] = []
for (let i = 0; i < whereUsed.length; i++) {
const associatedChatflow = await appServer.AppDataSource.getRepository(ChatFlow).findOne({
where: { id: whereUsed[i] },
select: ['id', 'name']
})
if (associatedChatflow) {
updatedWhereUsed.push({
id: whereUsed[i],
name: associatedChatflow.name
})
}
}
return updatedWhereUsed
}
return []
} catch (error) {
throw new InternalFlowiseError(
StatusCodes.INTERNAL_SERVER_ERROR,
`Error: documentStoreServices.getUsedChatflowNames - ${getErrorMessage(error)}`
)
}
}
// Get chunks for a specific loader or store
const getDocumentStoreFileChunks = async (storeId: string, fileId: string, pageNo: number = 1) => {
try {
const appServer = getRunningExpressApp()
const entity = await appServer.AppDataSource.getRepository(DocumentStore).findOneBy({
id: storeId
})
if (!entity) {
throw new InternalFlowiseError(
StatusCodes.NOT_FOUND,
`Error: documentStoreServices.getDocumentStoreById - Document store ${storeId} not found`
)
}
const loaders = JSON.parse(entity.loaders)
let found: IDocumentStoreLoader | undefined
if (fileId !== 'all') {
found = loaders.find((loader: IDocumentStoreLoader) => loader.id === fileId)
if (!found) {
throw new InternalFlowiseError(
StatusCodes.NOT_FOUND,
`Error: documentStoreServices.getDocumentStoreById - Document file ${fileId} not found`
)
}
}
let totalChars = 0
loaders.forEach((loader: IDocumentStoreLoader) => {
totalChars += loader.totalChars
})
if (found) {
found.totalChars = totalChars
found.id = entity.id
found.status = entity.status
}
const PAGE_SIZE = 50
const skip = (pageNo - 1) * PAGE_SIZE
const take = PAGE_SIZE
let whereCondition: any = { docId: fileId }
if (fileId === 'all') {
whereCondition = { storeId: storeId }
}
const count = await appServer.AppDataSource.getRepository(DocumentStoreFileChunk).count({
where: whereCondition
})
const chunksWithCount = await appServer.AppDataSource.getRepository(DocumentStoreFileChunk).find({
skip,
take,
where: whereCondition,
order: {
chunkNo: 'ASC'
}
})
if (!chunksWithCount) {
throw new InternalFlowiseError(StatusCodes.NOT_FOUND, `File ${fileId} not found`)
}
const response: IDocumentStoreFileChunkPagedResponse = {
chunks: chunksWithCount,
count: count,
file: found,
currentPage: pageNo,
storeName: entity.name,
description: entity.description
}
return response
} catch (error) {
throw new InternalFlowiseError(
StatusCodes.INTERNAL_SERVER_ERROR,
`Error: documentStoreServices.getDocumentStoreFileChunks - ${getErrorMessage(error)}`
)
}
}
const deleteDocumentStore = async (storeId: string) => {
try {
const appServer = getRunningExpressApp()
// delete all the chunks associated with the store
await appServer.AppDataSource.getRepository(DocumentStoreFileChunk).delete({
storeId: storeId
})
// now delete the files associated with the store
const entity = await appServer.AppDataSource.getRepository(DocumentStore).findOneBy({
id: storeId
})
if (!entity) throw new Error(`Document store ${storeId} not found`)
await removeFilesFromStorage(DOCUMENT_STORE_BASE_FOLDER, entity.id)
// now delete the store
const tbd = await appServer.AppDataSource.getRepository(DocumentStore).delete({
id: storeId
})
return { deleted: tbd.affected }
} catch (error) {
throw new InternalFlowiseError(
StatusCodes.INTERNAL_SERVER_ERROR,
`Error: documentStoreServices.deleteDocumentStore - ${getErrorMessage(error)}`
)
}
}
const deleteDocumentStoreFileChunk = async (storeId: string, docId: string, chunkId: string) => {
try {
const appServer = getRunningExpressApp()
const entity = await appServer.AppDataSource.getRepository(DocumentStore).findOneBy({
id: storeId
})
if (!entity) {
throw new InternalFlowiseError(StatusCodes.NOT_FOUND, `Document store ${storeId} not found`)
}
const loaders = JSON.parse(entity.loaders)
const found = loaders.find((ldr: IDocumentStoreLoader) => ldr.id === docId)
if (!found) {
throw new InternalFlowiseError(StatusCodes.NOT_FOUND, `Document store loader ${docId} not found`)
}
const tbdChunk = await appServer.AppDataSource.getRepository(DocumentStoreFileChunk).findOneBy({
id: chunkId
})
if (!tbdChunk) {
throw new InternalFlowiseError(StatusCodes.NOT_FOUND, `Document Chunk ${chunkId} not found`)
}
await appServer.AppDataSource.getRepository(DocumentStoreFileChunk).delete(chunkId)
found.totalChunks--
found.totalChars -= tbdChunk.pageContent.length
entity.loaders = JSON.stringify(loaders)
await appServer.AppDataSource.getRepository(DocumentStore).save(entity)
return getDocumentStoreFileChunks(storeId, docId)
} catch (error) {
throw new InternalFlowiseError(
StatusCodes.INTERNAL_SERVER_ERROR,
`Error: documentStoreServices.deleteDocumentStoreFileChunk - ${getErrorMessage(error)}`
)
}
}
const editDocumentStoreFileChunk = async (storeId: string, docId: string, chunkId: string, content: string) => {
try {
const appServer = getRunningExpressApp()
const entity = await appServer.AppDataSource.getRepository(DocumentStore).findOneBy({
id: storeId
})
if (!entity) {
throw new InternalFlowiseError(StatusCodes.NOT_FOUND, `Document store ${storeId} not found`)
}
const loaders = JSON.parse(entity.loaders)
const found = loaders.find((ldr: IDocumentStoreLoader) => ldr.id === docId)
if (!found) {
throw new InternalFlowiseError(StatusCodes.NOT_FOUND, `Document store loader ${docId} not found`)
}
const editChunk = await appServer.AppDataSource.getRepository(DocumentStoreFileChunk).findOneBy({
id: chunkId
})
if (!editChunk) {
throw new InternalFlowiseError(StatusCodes.NOT_FOUND, `Document Chunk ${chunkId} not found`)
}
found.totalChars -= editChunk.pageContent.length
editChunk.pageContent = content
found.totalChars += content.length
await appServer.AppDataSource.getRepository(DocumentStoreFileChunk).save(editChunk)
entity.loaders = JSON.stringify(loaders)
await appServer.AppDataSource.getRepository(DocumentStore).save(entity)
return getDocumentStoreFileChunks(storeId, docId)
} catch (error) {
throw new InternalFlowiseError(
StatusCodes.INTERNAL_SERVER_ERROR,
`Error: documentStoreServices.editDocumentStoreFileChunk - ${getErrorMessage(error)}`
)
}
}
// Update documentStore
const updateDocumentStore = async (documentStore: DocumentStore, updatedDocumentStore: DocumentStore) => {
try {
const appServer = getRunningExpressApp()
const tmpUpdatedDocumentStore = appServer.AppDataSource.getRepository(DocumentStore).merge(documentStore, updatedDocumentStore)
const dbResponse = await appServer.AppDataSource.getRepository(DocumentStore).save(tmpUpdatedDocumentStore)
return dbResponse
} catch (error) {
throw new InternalFlowiseError(
StatusCodes.INTERNAL_SERVER_ERROR,
`Error: documentStoreServices.updateDocumentStore - ${getErrorMessage(error)}`
)
}
}
const _saveFileToStorage = async (fileBase64: string, entity: DocumentStore) => {
const splitDataURI = fileBase64.split(',')
const filename = splitDataURI.pop()?.split(':')[1] ?? ''
const bf = Buffer.from(splitDataURI.pop() || '', 'base64')
const mimePrefix = splitDataURI.pop()
let mime = ''
if (mimePrefix) {
mime = mimePrefix.split(';')[0].split(':')[1]
}
await addFileToStorage(mime, bf, filename, DOCUMENT_STORE_BASE_FOLDER, entity.id)
return {
id: uuidv4(),
name: filename,
mimePrefix: mime,
size: bf.length,
status: DocumentStoreStatus.NEW,
uploaded: new Date()
}
}
const _splitIntoChunks = async (data: IDocumentStoreLoaderForPreview) => {
try {
const appServer = getRunningExpressApp()
let splitterInstance = null
if (data.splitterConfig && Object.keys(data.splitterConfig).length > 0) {
const nodeInstanceFilePath = appServer.nodesPool.componentNodes[data.splitterId].filePath as string
const nodeModule = await import(nodeInstanceFilePath)
const newNodeInstance = new nodeModule.nodeClass()
let nodeData = {
inputs: { ...data.splitterConfig },
id: 'splitter_0'
}
splitterInstance = await newNodeInstance.init(nodeData)
}
const nodeInstanceFilePath = appServer.nodesPool.componentNodes[data.loaderId].filePath as string
const nodeModule = await import(nodeInstanceFilePath)
// doc loader configs
const nodeData = {
credential: data.credential || undefined,
inputs: { ...data.loaderConfig, textSplitter: splitterInstance },
outputs: { output: 'document' }
}
const options: ICommonObject = {
chatflowid: uuidv4(),
appDataSource: appServer.AppDataSource,
databaseEntities,
logger
}
const docNodeInstance = new nodeModule.nodeClass()
let docs: IDocument[] = await docNodeInstance.init(nodeData, '', options)
return docs
} catch (error) {
throw new InternalFlowiseError(
StatusCodes.INTERNAL_SERVER_ERROR,
`Error: documentStoreServices.splitIntoChunks - ${getErrorMessage(error)}`
)
}
}
const _normalizeFilePaths = async (data: IDocumentStoreLoaderForPreview, entity: DocumentStore | null) => {
const keys = Object.getOwnPropertyNames(data.loaderConfig)
let rehydrated = false
for (let i = 0; i < keys.length; i++) {
const input = data.loaderConfig[keys[i]]
if (!input) {
continue
}
if (typeof input !== 'string') {
continue
}
let documentStoreEntity: DocumentStore | null = entity
if (input.startsWith('FILE-STORAGE::')) {
if (!documentStoreEntity) {
const appServer = getRunningExpressApp()
documentStoreEntity = await appServer.AppDataSource.getRepository(DocumentStore).findOneBy({
id: data.storeId
})
if (!documentStoreEntity) {
throw new InternalFlowiseError(StatusCodes.NOT_FOUND, `Document store ${data.storeId} not found`)
}
}
const fileName = input.replace('FILE-STORAGE::', '')
let files: string[] = []
if (fileName.startsWith('[') && fileName.endsWith(']')) {
files = JSON.parse(fileName)
} else {
files = [fileName]
}
const loaders = JSON.parse(documentStoreEntity.loaders)
const currentLoader = loaders.find((ldr: IDocumentStoreLoader) => ldr.id === data.id)
if (currentLoader) {
const base64Files: string[] = []
for (const file of files) {
const bf = await getFileFromStorage(file, DOCUMENT_STORE_BASE_FOLDER, documentStoreEntity.id)
// find the file entry that has the same name as the file
const uploadedFile = currentLoader.files.find((uFile: IDocumentStoreLoaderFile) => uFile.name === file)
const mimePrefix = 'data:' + uploadedFile.mimePrefix + ';base64'
const base64String = mimePrefix + ',' + bf.toString('base64') + `,filename:${file}`
base64Files.push(base64String)
}
data.loaderConfig[keys[i]] = JSON.stringify(base64Files)
rehydrated = true
}
}
}
data.rehydrated = rehydrated
}
const previewChunks = async (data: IDocumentStoreLoaderForPreview) => {
try {
if (data.preview) {
if (
data.loaderId === 'cheerioWebScraper' ||
data.loaderId === 'puppeteerWebScraper' ||
data.loaderId === 'playwrightWebScraper'
) {
data.loaderConfig['limit'] = 3
}
}
if (!data.rehydrated) {
await _normalizeFilePaths(data, null)
}
let docs = await _splitIntoChunks(data)
const totalChunks = docs.length
// if -1, return all chunks
if (data.previewChunkCount === -1) data.previewChunkCount = totalChunks
// return all docs if the user ask for more than we have
if (totalChunks <= data.previewChunkCount) data.previewChunkCount = totalChunks
// return only the first n chunks
if (totalChunks > data.previewChunkCount) docs = docs.slice(0, data.previewChunkCount)
return { chunks: docs, totalChunks: totalChunks, previewChunkCount: data.previewChunkCount }
} catch (error) {
throw new InternalFlowiseError(
StatusCodes.INTERNAL_SERVER_ERROR,
`Error: documentStoreServices.previewChunks - ${getErrorMessage(error)}`
)
}
}
const processAndSaveChunks = async (data: IDocumentStoreLoaderForPreview) => {
try {
const appServer = getRunningExpressApp()
const entity = await appServer.AppDataSource.getRepository(DocumentStore).findOneBy({
id: data.storeId
})
if (!entity) {
throw new InternalFlowiseError(
StatusCodes.NOT_FOUND,
`Error: documentStoreServices.processAndSaveChunks - Document store ${data.storeId} not found`
)
}
const newLoaderId = data.id ?? uuidv4()
const existingLoaders = JSON.parse(entity.loaders)
const found = existingLoaders.find((ldr: IDocumentStoreLoader) => ldr.id === newLoaderId)
if (found) {
// clean up the current status and mark the loader as pending_sync
found.totalChunks = 0
found.totalChars = 0
found.status = DocumentStoreStatus.SYNCING
entity.loaders = JSON.stringify(existingLoaders)
} else {
let loader: IDocumentStoreLoader = {
id: newLoaderId,
loaderId: data.loaderId,
loaderName: data.loaderName,
loaderConfig: data.loaderConfig,
splitterId: data.splitterId,
splitterName: data.splitterName,
splitterConfig: data.splitterConfig,
totalChunks: 0,
totalChars: 0,
status: DocumentStoreStatus.SYNCING
}
if (data.credential) {
loader.credential = data.credential
}
existingLoaders.push(loader)
entity.loaders = JSON.stringify(existingLoaders)
}
await appServer.AppDataSource.getRepository(DocumentStore).save(entity)
// this method will run async, will have to be moved to a worker thread
_saveChunksToStorage(data, entity, newLoaderId).then(() => {})
return getDocumentStoreFileChunks(data.storeId as string, newLoaderId)
} catch (error) {
throw new InternalFlowiseError(
StatusCodes.INTERNAL_SERVER_ERROR,
`Error: documentStoreServices.processAndSaveChunks - ${getErrorMessage(error)}`
)
}
}
const _saveChunksToStorage = async (data: IDocumentStoreLoaderForPreview, entity: DocumentStore, newLoaderId: string) => {
const re = new RegExp('^data.*;base64', 'i')
try {
const appServer = getRunningExpressApp()
//step 1: restore the full paths, if any
await _normalizeFilePaths(data, entity)
//step 2: split the file into chunks
previewChunks(data).then(async (response) => {
//{ chunks: docs, totalChunks: totalChunks, previewChunkCount: data.previewChunkCount }
//step 3: remove base64 files and save them to storage, this needs to be rewritten
let filesWithMetadata = []
const keys = Object.getOwnPropertyNames(data.loaderConfig)
for (let i = 0; i < keys.length; i++) {
const input = data.loaderConfig[keys[i]]
if (!input) {
continue
}
if (typeof input !== 'string') {
continue
}
if (input.startsWith('[') && input.endsWith(']')) {
const files = JSON.parse(input)
const fileNames: string[] = []
for (let j = 0; j < files.length; j++) {
const file = files[j]
if (re.test(file)) {
const fileMetadata = await _saveFileToStorage(file, entity)
fileNames.push(fileMetadata.name)
filesWithMetadata.push(fileMetadata)
}
}
data.loaderConfig[keys[i]] = 'FILE-STORAGE::' + JSON.stringify(fileNames)
} else if (re.test(input)) {
const fileNames: string[] = []
const fileMetadata = await _saveFileToStorage(input, entity)
fileNames.push(fileMetadata.name)
filesWithMetadata.push(fileMetadata)
data.loaderConfig[keys[i]] = 'FILE-STORAGE::' + JSON.stringify(fileNames)
break
}
}
const existingLoaders = JSON.parse(entity.loaders)
const loader = existingLoaders.find((ldr: IDocumentStoreLoader) => ldr.id === newLoaderId)
if (data.id) {
//step 4: remove all files and chunks associated with the previous loader
const index = existingLoaders.indexOf(loader)
if (index > -1) {
existingLoaders.splice(index, 1)
if (!data.rehydrated) {
if (loader.files) {
loader.files.map(async (file: IDocumentStoreLoaderFile) => {
await removeSpecificFileFromStorage(DOCUMENT_STORE_BASE_FOLDER, entity.id, file.name)
})
}
}
}
}
//step 5: upload with the new files and loaderConfig
if (filesWithMetadata.length > 0) {
loader.loaderConfig = data.loaderConfig
loader.files = filesWithMetadata
}
//step 6: update the loaders with the new loaderConfig
if (data.id) {
existingLoaders.push(loader)
}
//step 7: remove all previous chunks
await appServer.AppDataSource.getRepository(DocumentStoreFileChunk).delete({ docId: newLoaderId })
if (response.chunks) {
//step 8: now save the new chunks
const totalChars = response.chunks.reduce((acc: number, chunk) => acc + chunk.pageContent.length, 0)
response.chunks.map(async (chunk: IDocument, index: number) => {
const docChunk: DocumentStoreFileChunk = {
docId: newLoaderId,
storeId: data.storeId || '',
id: uuidv4(),
chunkNo: index + 1,
pageContent: chunk.pageContent,
metadata: JSON.stringify(chunk.metadata)
}
const dChunk = appServer.AppDataSource.getRepository(DocumentStoreFileChunk).create(docChunk)
await appServer.AppDataSource.getRepository(DocumentStoreFileChunk).save(dChunk)
})
// update the loader with the new metrics
loader.totalChunks = response.totalChunks
loader.totalChars = totalChars
}
loader.status = 'SYNC'
// have a flag and iterate over the loaders and update the entity status to SYNC
const allSynced = existingLoaders.every((ldr: IDocumentStoreLoader) => ldr.status === 'SYNC')
entity.status = allSynced ? DocumentStoreStatus.SYNC : DocumentStoreStatus.STALE
entity.loaders = JSON.stringify(existingLoaders)
//step 9: update the entity in the database
await appServer.AppDataSource.getRepository(DocumentStore).save(entity)
return
})
} catch (error) {
throw new InternalFlowiseError(
StatusCodes.INTERNAL_SERVER_ERROR,
`Error: documentStoreServices._saveChunksToStorage - ${getErrorMessage(error)}`
)
}
}
// Get all component nodes
const getDocumentLoaders = async () => {
const removeDocumentLoadersWithName = ['documentStore', 'vectorStoreToDocument', 'unstructuredFolderLoader', 'folderFiles']
try {
const dbResponse = await nodesService.getAllNodesForCategory('Document Loaders')
return dbResponse.filter((node) => !removeDocumentLoadersWithName.includes(node.name))
} catch (error) {
throw new InternalFlowiseError(
StatusCodes.INTERNAL_SERVER_ERROR,
`Error: documentStoreServices.getDocumentLoaders - ${getErrorMessage(error)}`
)
}
}
const updateDocumentStoreUsage = async (chatId: string, storeId: string | undefined) => {
try {
// find the document store
const appServer = getRunningExpressApp()
// find all entities that have the chatId in their whereUsed
const entities = await appServer.AppDataSource.getRepository(DocumentStore).find()
entities.map(async (entity: DocumentStore) => {
const whereUsed = JSON.parse(entity.whereUsed)
const found = whereUsed.find((w: string) => w === chatId)
if (found) {
if (!storeId) {
// remove the chatId from the whereUsed, as the store is being deleted
const index = whereUsed.indexOf(chatId)
if (index > -1) {
whereUsed.splice(index, 1)
entity.whereUsed = JSON.stringify(whereUsed)
await appServer.AppDataSource.getRepository(DocumentStore).save(entity)
}
} else if (entity.id === storeId) {
// do nothing, already found and updated
} else if (entity.id !== storeId) {
// remove the chatId from the whereUsed, as a new store is being used
const index = whereUsed.indexOf(chatId)
if (index > -1) {
whereUsed.splice(index, 1)
entity.whereUsed = JSON.stringify(whereUsed)
await appServer.AppDataSource.getRepository(DocumentStore).save(entity)
}
}
} else {
if (entity.id === storeId) {
// add the chatId to the whereUsed
whereUsed.push(chatId)
entity.whereUsed = JSON.stringify(whereUsed)
await appServer.AppDataSource.getRepository(DocumentStore).save(entity)
}
}
})
} catch (error) {
throw new InternalFlowiseError(
StatusCodes.INTERNAL_SERVER_ERROR,
`Error: documentStoreServices.updateDocumentStoreUsage - ${getErrorMessage(error)}`
)
}
}
export default {
updateDocumentStoreUsage,
deleteDocumentStore,
createDocumentStore,
deleteLoaderFromDocumentStore,
getAllDocumentStores,
getDocumentStoreById,
getUsedChatflowNames,
getDocumentStoreFileChunks,
updateDocumentStore,
previewChunks,
processAndSaveChunks,
deleteDocumentStoreFileChunk,
editDocumentStoreFileChunk,
getDocumentLoaders
}
+23 -1
View File
@@ -23,6 +23,27 @@ const getAllNodes = async () => {
}
}
// Get all component nodes for a specific category
const getAllNodesForCategory = async (category: string) => {
try {
const appServer = getRunningExpressApp()
const dbResponse = []
for (const nodeName in appServer.nodesPool.componentNodes) {
const componentNode = appServer.nodesPool.componentNodes[nodeName]
if (componentNode.category === category) {
const clonedNode = cloneDeep(componentNode)
dbResponse.push(clonedNode)
}
}
return dbResponse
} catch (error) {
throw new InternalFlowiseError(
StatusCodes.INTERNAL_SERVER_ERROR,
`Error: nodesService.getAllNodesForCategory - ${getErrorMessage(error)}`
)
}
}
// Get specific component node via name
const getNodeByName = async (nodeName: string) => {
try {
@@ -138,5 +159,6 @@ export default {
getNodeByName,
getSingleNodeIcon,
getSingleNodeAsyncOptions,
executeCustomFunction
executeCustomFunction,
getAllNodesForCategory
}