Merge pull request #1243 from FlowiseAI/feature/VectorStoreRevamp

Feature/add vector upsert ability
This commit is contained in:
Henry Heng
2023-11-23 17:08:51 +00:00
committed by GitHub
114 changed files with 10368 additions and 4786 deletions
+1
View File
@@ -166,6 +166,7 @@ export interface IncomingInput {
overrideConfig?: ICommonObject
socketIOClientId?: string
chatId?: string
stopNodeId?: string
}
export interface IActiveChatflows {
+34 -9
View File
@@ -135,6 +135,7 @@ export class App {
'/api/v1/chatflows/apikey/',
'/api/v1/public-chatflows',
'/api/v1/prediction/',
'/api/v1/vector/upsert/',
'/api/v1/node-icon/',
'/api/v1/components-credentials-icon/',
'/api/v1/chatflows-streaming',
@@ -1075,6 +1076,23 @@ export class App {
return res.status(201).send('OK')
})
// ----------------------------------------
// Upsert
// ----------------------------------------
this.app.post(
'/api/v1/vector/upsert/:id',
upload.array('files'),
(req: Request, res: Response, next: NextFunction) => getRateLimiter(req, res, next),
async (req: Request, res: Response) => {
await this.buildChatflow(req, res, undefined, false, true)
}
)
this.app.post('/api/v1/vector/internal-upsert/:id', async (req: Request, res: Response) => {
await this.buildChatflow(req, res, undefined, true, true)
})
// ----------------------------------------
// Prediction
// ----------------------------------------
@@ -1085,13 +1103,13 @@ export class App {
upload.array('files'),
(req: Request, res: Response, next: NextFunction) => getRateLimiter(req, res, next),
async (req: Request, res: Response) => {
await this.processPrediction(req, res, socketIO)
await this.buildChatflow(req, res, socketIO)
}
)
// Send input message and get prediction result (Internal)
this.app.post('/api/v1/internal-prediction/:id', async (req: Request, res: Response) => {
await this.processPrediction(req, res, socketIO, true)
await this.buildChatflow(req, res, socketIO, true)
})
// ----------------------------------------
@@ -1321,13 +1339,14 @@ export class App {
}
/**
* Process Prediction
* Build Chatflow
* @param {Request} req
* @param {Response} res
* @param {Server} socketIO
* @param {boolean} isInternal
* @param {boolean} isUpsert
*/
async processPrediction(req: Request, res: Response, socketIO?: Server, isInternal: boolean = false) {
async buildChatflow(req: Request, res: Response, socketIO?: Server, isInternal: boolean = false, isUpsert: boolean = false) {
try {
const chatflowid = req.params.id
let incomingInput: IncomingInput = req.body
@@ -1368,7 +1387,8 @@ export class App {
question: req.body.question ?? 'hello',
overrideConfig,
history: [],
socketIOClientId: req.body.socketIOClientId
socketIOClientId: req.body.socketIOClientId,
stopNodeId: req.body.stopNodeId
}
}
@@ -1393,7 +1413,8 @@ export class App {
this.chatflowPool.activeChatflows[chatflowid].overrideConfig,
incomingInput.overrideConfig
) &&
!isStartNodeDependOnInput(this.chatflowPool.activeChatflows[chatflowid].startingNodes, nodes)
!isStartNodeDependOnInput(this.chatflowPool.activeChatflows[chatflowid].startingNodes, nodes) &&
!isUpsert
)
}
@@ -1413,14 +1434,15 @@ export class App {
const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data
if (!endingNodeData) return res.status(500).send(`Ending node ${endingNodeId} data not found`)
if (endingNodeData && endingNodeData.category !== 'Chains' && endingNodeData.category !== 'Agents') {
if (endingNodeData && endingNodeData.category !== 'Chains' && endingNodeData.category !== 'Agents' && !isUpsert) {
return res.status(500).send(`Ending node must be either a Chain or Agent`)
}
if (
endingNodeData.outputs &&
Object.keys(endingNodeData.outputs).length &&
!Object.values(endingNodeData.outputs).includes(endingNodeData.name)
!Object.values(endingNodeData.outputs).includes(endingNodeData.name) &&
!isUpsert
) {
return res
.status(500)
@@ -1450,8 +1472,11 @@ export class App {
chatflowid,
this.AppDataSource,
incomingInput?.overrideConfig,
this.cachePool
this.cachePool,
isUpsert,
incomingInput.stopNodeId
)
if (isUpsert) return res.status(201).send('Successfully Upserted')
const nodeToExecute = reactFlowNodes.find((node: IReactFlowNode) => node.id === endingNodeId)
if (!nodeToExecute) return res.status(404).send(`Node ${endingNodeId} not found`)
+30 -11
View File
@@ -222,7 +222,9 @@ export const buildLangchain = async (
chatflowid: string,
appDataSource: DataSource,
overrideConfig?: ICommonObject,
cachePool?: CachePool
cachePool?: CachePool,
isUpsert?: boolean,
stopNodeId?: string
) => {
const flowNodes = cloneDeep(reactFlowNodes)
@@ -254,16 +256,33 @@ export const buildLangchain = async (
if (overrideConfig) flowNodeData = replaceInputsWithConfig(flowNodeData, overrideConfig)
const reactFlowNodeData: INodeData = resolveVariables(flowNodeData, flowNodes, question, chatHistory)
logger.debug(`[server]: Initializing ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
flowNodes[nodeIndex].data.instance = await newNodeInstance.init(reactFlowNodeData, question, {
chatId,
chatflowid,
appDataSource,
databaseEntities,
logger,
cachePool
})
logger.debug(`[server]: Finished initializing ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
if (
isUpsert &&
((stopNodeId && reactFlowNodeData.id === stopNodeId) || (!stopNodeId && reactFlowNodeData.category === 'Vector Stores'))
) {
logger.debug(`[server]: Upserting ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
await newNodeInstance.vectorStoreMethods!['upsert']!.call(newNodeInstance, reactFlowNodeData, {
chatId,
chatflowid,
appDataSource,
databaseEntities,
logger,
cachePool
})
logger.debug(`[server]: Finished upserting ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
break
} else {
logger.debug(`[server]: Initializing ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
flowNodes[nodeIndex].data.instance = await newNodeInstance.init(reactFlowNodeData, question, {
chatId,
chatflowid,
appDataSource,
databaseEntities,
logger,
cachePool
})
logger.debug(`[server]: Finished initializing ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
}
} catch (e: any) {
logger.error(e)
throw new Error(e)