Merge branch 'main' into FEATURE/Vision

# Conflicts:
#	packages/server/src/index.ts
This commit is contained in:
Henry
2024-01-15 19:20:04 +00:00
51 changed files with 2231 additions and 1486 deletions
+47 -82
View File
@@ -20,7 +20,6 @@ import {
ICredentialReturnResponse,
chatType,
IChatMessage,
IReactFlowEdge,
IDepthQueue,
INodeDirectedGraph
} from './Interface'
@@ -39,14 +38,14 @@ import {
databaseEntities,
transformToCredentialEntity,
decryptCredentialData,
clearAllSessionMemory,
replaceInputsWithConfig,
getEncryptionKey,
checkMemorySessionId,
clearSessionMemoryFromViewMessageDialog,
getMemorySessionId,
getUserHome,
replaceChatHistory,
getAllConnectedNodes
getSessionChatHistory,
getAllConnectedNodes,
clearSessionMemory,
findMemoryNode
} from './utils'
import { cloneDeep, omit, uniqWith, isEqual } from 'lodash'
import { getDataSource } from './DataSource'
@@ -362,7 +361,8 @@ export class App {
const chatflow = await this.AppDataSource.getRepository(ChatFlow).findOneBy({
id: req.params.id
})
if (chatflow && chatflow.chatbotConfig) {
if (!chatflow) return res.status(404).send(`Chatflow ${req.params.id} not found`)
if (chatflow.chatbotConfig) {
try {
const parsedConfig = JSON.parse(chatflow.chatbotConfig)
return res.json(parsedConfig)
@@ -370,7 +370,7 @@ export class App {
return res.status(500).send(`Error parsing Chatbot Config for Chatflow ${req.params.id}`)
}
}
return res.status(404).send(`Chatbot Config for Chatflow ${req.params.id} not found`)
return res.status(200).send('OK')
})
// Save chatflow
@@ -533,7 +533,7 @@ export class App {
res.status(404).send(`Chatflow ${chatflowid} not found`)
return
}
const chatId = (req.query?.chatId as string) ?? (await getChatId(chatflowid))
const chatId = req.query?.chatId as string
const memoryType = req.query?.memoryType as string | undefined
const sessionId = req.query?.sessionId as string | undefined
const chatType = req.query?.chatType as string | undefined
@@ -543,20 +543,22 @@ export class App {
const parsedFlowData: IReactFlowObject = JSON.parse(flowData)
const nodes = parsedFlowData.nodes
if (isClearFromViewMessageDialog) {
await clearSessionMemoryFromViewMessageDialog(
try {
await clearSessionMemory(
nodes,
this.nodesPool.componentNodes,
chatId,
this.AppDataSource,
sessionId,
memoryType
memoryType,
isClearFromViewMessageDialog
)
} else {
await clearAllSessionMemory(nodes, this.nodesPool.componentNodes, chatId, this.AppDataSource, sessionId)
} catch (e) {
return res.status(500).send('Error clearing chat messages')
}
const deleteOptions: FindOptionsWhere<ChatMessage> = { chatflowid, chatId }
const deleteOptions: FindOptionsWhere<ChatMessage> = { chatflowid }
if (chatId) deleteOptions.chatId = chatId
if (memoryType) deleteOptions.memoryType = memoryType
if (sessionId) deleteOptions.sessionId = sessionId
if (chatType) deleteOptions.chatType = chatType
@@ -644,7 +646,7 @@ export class App {
return res.json(result)
})
// Delete all chatmessages from chatflowid
// Delete all credentials from chatflowid
this.app.delete('/api/v1/credentials/:id', async (req: Request, res: Response) => {
const results = await this.AppDataSource.getRepository(Credential).delete({ id: req.params.id })
return res.json(results)
@@ -1454,26 +1456,6 @@ export class App {
return await this.AppDataSource.getRepository(ChatMessage).save(chatmessage)
}
/**
* Method that find memory label that is connected within chatflow
* In a chatflow, there should only be 1 memory node
* @param {IReactFlowNode[]} nodes
* @param {IReactFlowEdge[]} edges
* @returns {string | undefined}
*/
findMemoryLabel(nodes: IReactFlowNode[], edges: IReactFlowEdge[]): IReactFlowNode | undefined {
const memoryNodes = nodes.filter((node) => node.data.category === 'Memory')
const memoryNodeIds = memoryNodes.map((mem) => mem.data.id)
for (const edge of edges) {
if (memoryNodeIds.includes(edge.source)) {
const memoryNode = nodes.find((node) => node.data.id === edge.source)
return memoryNode
}
}
return undefined
}
async upsertVector(req: Request, res: Response, isInternal: boolean = false) {
try {
const chatflowid = req.params.id
@@ -1663,7 +1645,6 @@ export class App {
* - Still in sync (i.e the flow has not been modified since)
* - Existing overrideConfig and new overrideConfig are the same
* - Flow doesn't start with/contain nodes that depend on incomingInput.question
* - Its not an Upsert request
* TODO: convert overrideConfig to hash when we no longer store base64 string but filepath
***/
const isFlowReusable = () => {
@@ -1719,22 +1700,28 @@ export class App {
isStreamValid = isFlowValidForStream(nodes, endingNodeData)
}
let chatHistory: IMessage[] | string = incomingInput.history
let chatHistory: IMessage[] = incomingInput.history ?? []
// When {{chat_history}} is used in Prompt Template, fetch the chat conversations from memory
// When {{chat_history}} is used in Prompt Template, fetch the chat conversations from memory node
for (const endingNode of endingNodes) {
const endingNodeData = endingNode.data
if (!endingNodeData.inputs?.memory) continue
if (
endingNodeData.inputs?.memory &&
!incomingInput.history &&
(incomingInput.chatId || incomingInput.overrideConfig?.sessionId)
) {
const memoryNodeId = endingNodeData.inputs?.memory.split('.')[0].replace('{{', '')
const memoryNode = nodes.find((node) => node.data.id === memoryNodeId)
if (memoryNode) {
chatHistory = await replaceChatHistory(memoryNode, incomingInput, this.AppDataSource, databaseEntities, logger)
}
const memoryNodeId = endingNodeData.inputs?.memory.split('.')[0].replace('{{', '')
const memoryNode = nodes.find((node) => node.data.id === memoryNodeId)
if (!memoryNode) continue
if (!chatHistory.length && (incomingInput.chatId || incomingInput.overrideConfig?.sessionId)) {
chatHistory = await getSessionChatHistory(
memoryNode,
this.nodesPool.componentNodes,
incomingInput,
this.AppDataSource,
databaseEntities,
logger
)
}
}
@@ -1793,16 +1780,11 @@ export class App {
logger.debug(`[server]: Running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`)
let sessionId = undefined
if (nodeToExecuteData.instance) sessionId = checkMemorySessionId(nodeToExecuteData.instance, chatId)
const memoryNode = this.findMemoryLabel(nodes, edges)
const memoryNode = findMemoryNode(nodes, edges)
const memoryType = memoryNode?.data.label
let chatHistory: IMessage[] | string = incomingInput.history
if (memoryNode && !incomingInput.history && (incomingInput.chatId || incomingInput.overrideConfig?.sessionId)) {
chatHistory = await replaceChatHistory(memoryNode, incomingInput, this.AppDataSource, databaseEntities, logger)
}
let sessionId = undefined
if (memoryNode) sessionId = getMemorySessionId(memoryNode, incomingInput, chatId, isInternal)
const nodeInstanceFilePath = this.nodesPool.componentNodes[nodeToExecuteData.name].filePath as string
const nodeModule = await import(nodeInstanceFilePath)
@@ -1810,26 +1792,26 @@ export class App {
let result = isStreamValid
? await nodeInstance.run(nodeToExecuteData, incomingInput.question, {
uploads: incomingInput.uploads,
chatId,
chatflowid,
chatHistory,
socketIO,
socketIOClientId: incomingInput.socketIOClientId,
chatHistory: incomingInput.history,
logger,
appDataSource: this.AppDataSource,
databaseEntities,
analytic: chatflow.analytic,
chatId
uploads: incomingInput.uploads,
socketIO,
socketIOClientId: incomingInput.socketIOClientId
})
: await nodeInstance.run(nodeToExecuteData, incomingInput.question, {
chatId,
chatflowid,
uploads: incomingInput.uploads,
chatHistory,
chatHistory: incomingInput.history,
logger,
appDataSource: this.AppDataSource,
databaseEntities,
analytic: chatflow.analytic,
chatId
uploads: incomingInput.uploads
})
result = typeof result === 'string' ? { text: result } : result
@@ -1893,23 +1875,6 @@ export class App {
}
}
/**
* Get first chat message id
* @param {string} chatflowid
* @returns {string}
*/
export async function getChatId(chatflowid: string): Promise<string> {
// first chatmessage id as the unique chat id
const firstChatMessage = await getDataSource()
.getRepository(ChatMessage)
.createQueryBuilder('cm')
.select('cm.id')
.where('chatflowid = :chatflowid', { chatflowid })
.orderBy('cm.createdDate', 'ASC')
.getOne()
return firstChatMessage ? firstChatMessage.id : ''
}
let serverApp: App | undefined
export async function getAllChatFlow(): Promise<IChatFlow[]> {
+102 -73
View File
@@ -26,7 +26,8 @@ import {
getEncryptionKeyPath,
ICommonObject,
IDatabaseEntity,
IMessage
IMessage,
FlowiseMemory
} from 'flowise-components'
import { randomBytes } from 'crypto'
import { AES, enc } from 'crypto-js'
@@ -270,7 +271,7 @@ export const buildLangchain = async (
depthQueue: IDepthQueue,
componentNodes: IComponentNodes,
question: string,
chatHistory: IMessage[] | string,
chatHistory: IMessage[],
chatId: string,
chatflowid: string,
appDataSource: DataSource,
@@ -317,9 +318,10 @@ export const buildLangchain = async (
await newNodeInstance.vectorStoreMethods!['upsert']!.call(newNodeInstance, reactFlowNodeData, {
chatId,
chatflowid,
chatHistory,
logger,
appDataSource,
databaseEntities,
logger,
cachePool,
dynamicVariables
})
@@ -330,9 +332,10 @@ export const buildLangchain = async (
let outputResult = await newNodeInstance.init(reactFlowNodeData, question, {
chatId,
chatflowid,
chatHistory,
logger,
appDataSource,
databaseEntities,
logger,
cachePool,
dynamicVariables
})
@@ -424,66 +427,52 @@ export const buildLangchain = async (
}
/**
* Clear all session memories on the canvas
* @param {IReactFlowNode[]} reactFlowNodes
* @param {IComponentNodes} componentNodes
* @param {string} chatId
* @param {DataSource} appDataSource
* @param {string} sessionId
*/
export const clearAllSessionMemory = async (
reactFlowNodes: IReactFlowNode[],
componentNodes: IComponentNodes,
chatId: string,
appDataSource: DataSource,
sessionId?: string
) => {
for (const node of reactFlowNodes) {
if (node.data.category !== 'Memory' && node.data.type !== 'OpenAIAssistant') continue
const nodeInstanceFilePath = componentNodes[node.data.name].filePath as string
const nodeModule = await import(nodeInstanceFilePath)
const newNodeInstance = new nodeModule.nodeClass()
if (sessionId && node.data.inputs) {
node.data.inputs.sessionId = sessionId
}
if (newNodeInstance.memoryMethods && newNodeInstance.memoryMethods.clearSessionMemory) {
await newNodeInstance.memoryMethods.clearSessionMemory(node.data, { chatId, appDataSource, databaseEntities, logger })
}
}
}
/**
* Clear specific session memory from View Message Dialog UI
* Clear session memories
* @param {IReactFlowNode[]} reactFlowNodes
* @param {IComponentNodes} componentNodes
* @param {string} chatId
* @param {DataSource} appDataSource
* @param {string} sessionId
* @param {string} memoryType
* @param {string} isClearFromViewMessageDialog
*/
export const clearSessionMemoryFromViewMessageDialog = async (
export const clearSessionMemory = async (
reactFlowNodes: IReactFlowNode[],
componentNodes: IComponentNodes,
chatId: string,
appDataSource: DataSource,
sessionId?: string,
memoryType?: string
memoryType?: string,
isClearFromViewMessageDialog?: string
) => {
if (!sessionId) return
for (const node of reactFlowNodes) {
if (node.data.category !== 'Memory' && node.data.type !== 'OpenAIAssistant') continue
if (memoryType && node.data.label !== memoryType) continue
// Only clear specific session memory from View Message Dialog UI
if (isClearFromViewMessageDialog && memoryType && node.data.label !== memoryType) continue
const nodeInstanceFilePath = componentNodes[node.data.name].filePath as string
const nodeModule = await import(nodeInstanceFilePath)
const newNodeInstance = new nodeModule.nodeClass()
const options: ICommonObject = { chatId, appDataSource, databaseEntities, logger }
if (sessionId && node.data.inputs) node.data.inputs.sessionId = sessionId
if (newNodeInstance.memoryMethods && newNodeInstance.memoryMethods.clearSessionMemory) {
await newNodeInstance.memoryMethods.clearSessionMemory(node.data, { chatId, appDataSource, databaseEntities, logger })
return
// SessionId always take priority first because it is the sessionId used for 3rd party memory node
if (sessionId && node.data.inputs) {
if (node.data.type === 'OpenAIAssistant') {
await newNodeInstance.clearChatMessages(node.data, options, { type: 'threadId', id: sessionId })
} else {
node.data.inputs.sessionId = sessionId
const initializedInstance: FlowiseMemory = await newNodeInstance.init(node.data, '', options)
await initializedInstance.clearChatMessages(sessionId)
}
} else if (chatId && node.data.inputs) {
if (node.data.type === 'OpenAIAssistant') {
await newNodeInstance.clearChatMessages(node.data, options, { type: 'chatId', id: chatId })
} else {
node.data.inputs.sessionId = chatId
const initializedInstance: FlowiseMemory = await newNodeInstance.init(node.data, '', options)
await initializedInstance.clearChatMessages(chatId)
}
}
}
}
@@ -500,7 +489,7 @@ export const getVariableValue = (
paramValue: string,
reactFlowNodes: IReactFlowNode[],
question: string,
chatHistory: IMessage[] | string,
chatHistory: IMessage[],
isAcceptVariable = false
) => {
let returnVal = paramValue
@@ -533,10 +522,7 @@ export const getVariableValue = (
}
if (isAcceptVariable && variableFullPath === CHAT_HISTORY_VAR_PREFIX) {
variableDict[`{{${variableFullPath}}}`] = handleEscapeCharacters(
typeof chatHistory === 'string' ? chatHistory : convertChatHistoryToText(chatHistory),
false
)
variableDict[`{{${variableFullPath}}}`] = handleEscapeCharacters(convertChatHistoryToText(chatHistory), false)
}
// Split by first occurrence of '.' to get just nodeId
@@ -583,7 +569,7 @@ export const resolveVariables = (
reactFlowNodeData: INodeData,
reactFlowNodes: IReactFlowNode[],
question: string,
chatHistory: IMessage[] | string
chatHistory: IMessage[]
): INodeData => {
let flowNodeData = cloneDeep(reactFlowNodeData)
const types = 'inputs'
@@ -970,21 +956,43 @@ export const redactCredentialWithPasswordType = (
}
/**
* Replace sessionId with new chatId
* Ex: after clear chat history, use the new chatId as sessionId
* Get sessionId
* Hierarchy of sessionId (top down)
* API/Embed:
* (1) Provided in API body - incomingInput.overrideConfig: { sessionId: 'abc' }
* (2) Provided in API body - incomingInput.chatId
*
* API/Embed + UI:
* (3) Hard-coded sessionId in UI
* (4) Not specified on UI nor API, default to chatId
* @param {any} instance
* @param {IncomingInput} incomingInput
* @param {string} chatId
*/
export const checkMemorySessionId = (instance: any, chatId: string): string | undefined => {
if (instance.memory && instance.memory.isSessionIdUsingChatMessageId && chatId) {
instance.memory.sessionId = chatId
instance.memory.chatHistory.sessionId = chatId
export const getMemorySessionId = (
memoryNode: IReactFlowNode,
incomingInput: IncomingInput,
chatId: string,
isInternal: boolean
): string | undefined => {
if (!isInternal) {
// Provided in API body - incomingInput.overrideConfig: { sessionId: 'abc' }
if (incomingInput.overrideConfig?.sessionId) {
return incomingInput.overrideConfig?.sessionId
}
// Provided in API body - incomingInput.chatId
if (incomingInput.chatId) {
return incomingInput.chatId
}
}
if (instance.memory && instance.memory.sessionId) return instance.memory.sessionId
else if (instance.memory && instance.memory.chatHistory && instance.memory.chatHistory.sessionId)
return instance.memory.chatHistory.sessionId
return undefined
// Hard-coded sessionId in UI
if (memoryNode.data.inputs?.sessionId) {
return memoryNode.data.inputs.sessionId
}
// Default chatId
return chatId
}
/**
@@ -996,31 +1004,52 @@ export const checkMemorySessionId = (instance: any, chatId: string): string | un
* @param {any} logger
* @returns {string}
*/
export const replaceChatHistory = async (
export const getSessionChatHistory = async (
memoryNode: IReactFlowNode,
componentNodes: IComponentNodes,
incomingInput: IncomingInput,
appDataSource: DataSource,
databaseEntities: IDatabaseEntity,
logger: any
): Promise<string> => {
const nodeInstanceFilePath = memoryNode.data.filePath as string
): Promise<IMessage[]> => {
const nodeInstanceFilePath = componentNodes[memoryNode.data.name].filePath as string
const nodeModule = await import(nodeInstanceFilePath)
const newNodeInstance = new nodeModule.nodeClass()
// Replace memory's sessionId/chatId
if (incomingInput.overrideConfig?.sessionId && memoryNode.data.inputs) {
memoryNode.data.inputs.sessionId = incomingInput.overrideConfig.sessionId
} else if (incomingInput.chatId && memoryNode.data.inputs) {
memoryNode.data.inputs.sessionId = incomingInput.chatId
}
if (newNodeInstance.memoryMethods && newNodeInstance.memoryMethods.getChatMessages) {
return await newNodeInstance.memoryMethods.getChatMessages(memoryNode.data, {
chatId: incomingInput.chatId,
appDataSource,
databaseEntities,
logger
})
}
const initializedInstance: FlowiseMemory = await newNodeInstance.init(memoryNode.data, '', {
appDataSource,
databaseEntities,
logger
})
return ''
return (await initializedInstance.getChatMessages()) as IMessage[]
}
/**
* Method that find memory that is connected within chatflow
* In a chatflow, there should only be 1 memory node
* @param {IReactFlowNode[]} nodes
* @param {IReactFlowEdge[]} edges
* @returns {string | undefined}
*/
export const findMemoryNode = (nodes: IReactFlowNode[], edges: IReactFlowEdge[]): IReactFlowNode | undefined => {
const memoryNodes = nodes.filter((node) => node.data.category === 'Memory')
const memoryNodeIds = memoryNodes.map((mem) => mem.data.id)
for (const edge of edges) {
if (memoryNodeIds.includes(edge.source)) {
const memoryNode = nodes.find((node) => node.data.id === edge.source)
return memoryNode
}
}
return undefined
}
/**