mirror of
https://github.com/farcasclaudiu/Flowise.git
synced 2026-06-28 11:00:55 +03:00
code cleanup
This commit is contained in:
+135
-55
@@ -9,7 +9,7 @@ import { Server } from 'socket.io'
|
||||
import logger from './utils/logger'
|
||||
import { expressRequestLogger } from './utils/logger'
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
|
||||
import { Between, IsNull, FindOptionsWhere } from 'typeorm'
|
||||
import {
|
||||
IChatFlow,
|
||||
IncomingInput,
|
||||
@@ -18,7 +18,9 @@ import {
|
||||
INodeData,
|
||||
IDatabaseExport,
|
||||
ICredentialReturnResponse,
|
||||
chatType
|
||||
chatType,
|
||||
IChatMessage,
|
||||
IReactFlowEdge
|
||||
} from './Interface'
|
||||
import {
|
||||
getNodeModulesPackagePath,
|
||||
@@ -42,10 +44,11 @@ import {
|
||||
getApiKey,
|
||||
transformToCredentialEntity,
|
||||
decryptCredentialData,
|
||||
clearSessionMemory,
|
||||
clearAllSessionMemory,
|
||||
replaceInputsWithConfig,
|
||||
getEncryptionKey,
|
||||
checkMemorySessionId
|
||||
checkMemorySessionId,
|
||||
clearSessionMemoryFromViewMessageDialog
|
||||
} from './utils'
|
||||
import { cloneDeep, omit } from 'lodash'
|
||||
import { getDataSource } from './DataSource'
|
||||
@@ -397,7 +400,39 @@ export class App {
|
||||
|
||||
// Get all chatmessages from chatflowid
|
||||
this.app.get('/api/v1/chatmessage/:id', async (req: Request, res: Response) => {
|
||||
const chatmessages = await this.getChatMessage(req.params.id, undefined)
|
||||
const sortOrder = req.query?.order as string | undefined
|
||||
const chatId = req.query?.chatId as string | undefined
|
||||
const memoryType = req.query?.memoryType as string | undefined
|
||||
const sessionId = req.query?.sessionId as string | undefined
|
||||
const startDate = req.query?.startDate as string | undefined
|
||||
const endDate = req.query?.endDate as string | undefined
|
||||
let chatTypeFilter = req.query?.chatType as chatType | undefined
|
||||
|
||||
if (chatTypeFilter) {
|
||||
try {
|
||||
const chatTypeFilterArray = JSON.parse(chatTypeFilter)
|
||||
if (chatTypeFilterArray.includes(chatType.EXTERNAL) && chatTypeFilterArray.includes(chatType.INTERNAL)) {
|
||||
chatTypeFilter = undefined
|
||||
} else if (chatTypeFilterArray.includes(chatType.EXTERNAL)) {
|
||||
chatTypeFilter = chatType.EXTERNAL
|
||||
} else if (chatTypeFilterArray.includes(chatType.INTERNAL)) {
|
||||
chatTypeFilter = chatType.INTERNAL
|
||||
}
|
||||
} catch (e) {
|
||||
return res.status(500).send(e)
|
||||
}
|
||||
}
|
||||
|
||||
const chatmessages = await this.getChatMessage(
|
||||
req.params.id,
|
||||
chatTypeFilter,
|
||||
sortOrder,
|
||||
chatId,
|
||||
memoryType,
|
||||
sessionId,
|
||||
startDate,
|
||||
endDate
|
||||
)
|
||||
return res.json(chatmessages)
|
||||
})
|
||||
|
||||
@@ -416,27 +451,41 @@ export class App {
|
||||
|
||||
// Delete all chatmessages from chatId
|
||||
this.app.delete('/api/v1/chatmessage/:id', async (req: Request, res: Response) => {
|
||||
const chatId = req.params.id
|
||||
const chatMessage = await this.AppDataSource.getRepository(ChatMessage).findOneBy({
|
||||
chatId: chatId
|
||||
})
|
||||
if (!chatMessage) {
|
||||
res.status(404).send(`chatmessage ${chatId} not found`)
|
||||
return
|
||||
}
|
||||
const chatflowid = req.params.id
|
||||
const chatflow = await this.AppDataSource.getRepository(ChatFlow).findOneBy({
|
||||
id: chatMessage.chatflowid
|
||||
id: chatflowid
|
||||
})
|
||||
if (!chatflow) {
|
||||
res.status(404).send(`Chatflow ${chatMessage.chatflowid} not found`)
|
||||
res.status(404).send(`Chatflow ${chatflowid} not found`)
|
||||
return
|
||||
}
|
||||
const chatId = (req.query?.chatId as string) ?? (await getChatId(chatflowid))
|
||||
const memoryType = req.query?.memoryType as string | undefined
|
||||
const sessionId = req.query?.sessionId as string | undefined
|
||||
const chatType = req.query?.chatType as string | undefined
|
||||
const isClearFromViewMessageDialog = req.query?.isClearFromViewMessageDialog as string | undefined
|
||||
|
||||
const flowData = chatflow.flowData
|
||||
const parsedFlowData: IReactFlowObject = JSON.parse(flowData)
|
||||
const nodes = parsedFlowData.nodes
|
||||
|
||||
clearSessionMemory(nodes, this.nodesPool.componentNodes, chatId, this.AppDataSource, req.query.sessionId as string)
|
||||
const results = await this.AppDataSource.getRepository(ChatMessage).delete({ chatId: chatId })
|
||||
if (isClearFromViewMessageDialog)
|
||||
clearSessionMemoryFromViewMessageDialog(
|
||||
nodes,
|
||||
this.nodesPool.componentNodes,
|
||||
chatId,
|
||||
this.AppDataSource,
|
||||
sessionId,
|
||||
memoryType
|
||||
)
|
||||
else clearAllSessionMemory(nodes, this.nodesPool.componentNodes, chatId, this.AppDataSource, sessionId)
|
||||
|
||||
const deleteOptions: FindOptionsWhere<ChatMessage> = { chatflowid, chatId }
|
||||
if (memoryType) deleteOptions.memoryType = memoryType
|
||||
if (sessionId) deleteOptions.sessionId = sessionId
|
||||
if (chatType) deleteOptions.chatType = chatType
|
||||
|
||||
const results = await this.AppDataSource.getRepository(ChatMessage).delete(deleteOptions)
|
||||
return res.json(results)
|
||||
})
|
||||
|
||||
@@ -831,30 +880,51 @@ export class App {
|
||||
|
||||
/**
|
||||
* Method that get chat messages.
|
||||
*
|
||||
* @param chatflowid -
|
||||
* @param chatType -
|
||||
*
|
||||
* @param {string} chatflowid
|
||||
* @param {chatType} chatType
|
||||
* @param {string} sortOrder
|
||||
* @param {string} chatId
|
||||
* @param {string} memoryType
|
||||
* @param {string} sessionId
|
||||
* @param {string} startDate
|
||||
* @param {string} endDate
|
||||
*/
|
||||
async getChatMessage(chatflowid: string, chatType: chatType | undefined): Promise<ChatMessage[]> {
|
||||
async getChatMessage(
|
||||
chatflowid: string,
|
||||
chatType: chatType | undefined,
|
||||
sortOrder: string = 'ASC',
|
||||
chatId?: string,
|
||||
memoryType?: string,
|
||||
sessionId?: string,
|
||||
startDate?: string,
|
||||
endDate?: string
|
||||
): Promise<ChatMessage[]> {
|
||||
let fromDate
|
||||
if (startDate) fromDate = new Date(startDate)
|
||||
|
||||
let toDate
|
||||
if (endDate) toDate = new Date(endDate)
|
||||
|
||||
return await this.AppDataSource.getRepository(ChatMessage).find({
|
||||
where: {
|
||||
chatflowid: chatflowid,
|
||||
chatType: chatType
|
||||
chatflowid,
|
||||
chatType,
|
||||
chatId,
|
||||
memoryType: memoryType ?? (chatId ? IsNull() : undefined),
|
||||
sessionId: sessionId ?? (chatId ? IsNull() : undefined),
|
||||
createdDate: toDate && fromDate ? Between(fromDate, toDate) : undefined
|
||||
},
|
||||
order: {
|
||||
createdDate: 'ASC'
|
||||
createdDate: sortOrder === 'DESC' ? 'DESC' : 'ASC'
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Method that add chat messages.
|
||||
*
|
||||
* @param chatMessage -
|
||||
*
|
||||
* @param {Partial<IChatMessage>} chatMessage
|
||||
*/
|
||||
async addChatMessage(chatMessage: any): Promise<ChatMessage> {
|
||||
async addChatMessage(chatMessage: Partial<IChatMessage>): Promise<ChatMessage> {
|
||||
const newChatMessage = new ChatMessage()
|
||||
Object.assign(newChatMessage, chatMessage)
|
||||
|
||||
@@ -863,17 +933,23 @@ export class App {
|
||||
}
|
||||
|
||||
/**
|
||||
* Method that find memory label.
|
||||
*
|
||||
* @param nodes -
|
||||
*
|
||||
* Method that find memory label that is connected within chatflow
|
||||
* In a chatflow, there should only be 1 memory node
|
||||
* @param {IReactFlowNode[]} nodes
|
||||
* @param {IReactFlowEdge[]} edges
|
||||
* @returns {string | undefined}
|
||||
*/
|
||||
findMemoryLabel(nodes: any[]): string | undefined {
|
||||
const memoryNode = nodes.find((node) => {
|
||||
return node.data.category === 'Memory'
|
||||
})
|
||||
findMemoryLabel(nodes: IReactFlowNode[], edges: IReactFlowEdge[]): string | undefined {
|
||||
const memoryNodes = nodes.filter((node) => node.data.category === 'Memory')
|
||||
const memoryNodeIds = memoryNodes.map((mem) => mem.data.id)
|
||||
|
||||
return memoryNode ? memoryNode.data.label : undefined
|
||||
for (const edge of edges) {
|
||||
if (memoryNodeIds.includes(edge.source)) {
|
||||
const memoryNode = nodes.find((node) => node.data.id === edge.source)
|
||||
return memoryNode ? memoryNode.data.label : undefined
|
||||
}
|
||||
}
|
||||
return undefined
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -883,7 +959,7 @@ export class App {
|
||||
* @param {Server} socketIO
|
||||
* @param {boolean} isInternal
|
||||
*/
|
||||
async processPrediction(req: Request, res: Response, socketIO?: Server, isInternal = false) {
|
||||
async processPrediction(req: Request, res: Response, socketIO?: Server, isInternal: boolean = false) {
|
||||
try {
|
||||
const chatflowid = req.params.id
|
||||
let incomingInput: IncomingInput = req.body
|
||||
@@ -895,7 +971,7 @@ export class App {
|
||||
})
|
||||
if (!chatflow) return res.status(404).send(`Chatflow ${chatflowid} not found`)
|
||||
|
||||
const chatId = incomingInput.chatId ?? uuidv4()
|
||||
const chatId = incomingInput.chatId ?? incomingInput.overrideConfig?.sessionId ?? uuidv4()
|
||||
const userMessageDateTime = new Date()
|
||||
|
||||
if (!isInternal) {
|
||||
@@ -1033,9 +1109,9 @@ export class App {
|
||||
logger.debug(`[server]: Running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`)
|
||||
|
||||
let sessionId = undefined
|
||||
let memoryLabel = undefined
|
||||
if (nodeToExecuteData.instance) sessionId = checkMemorySessionId(nodeToExecuteData.instance, chatId)
|
||||
if (sessionId) memoryLabel = this.findMemoryLabel(nodes)
|
||||
|
||||
const memoryType = this.findMemoryLabel(nodes, edges)
|
||||
|
||||
let result = isStreamValid
|
||||
? await nodeInstance.run(nodeToExecuteData, incomingInput.question, {
|
||||
@@ -1057,31 +1133,35 @@ export class App {
|
||||
|
||||
result = typeof result === 'string' ? { text: result } : result
|
||||
|
||||
await this.addChatMessage({
|
||||
const userMessage: Omit<IChatMessage, 'id'> = {
|
||||
role: 'userMessage',
|
||||
content: incomingInput.question,
|
||||
chatflowid: chatflowid,
|
||||
chatflowid,
|
||||
chatType: isInternal ? chatType.INTERNAL : chatType.EXTERNAL,
|
||||
chatId: chatId,
|
||||
memoryType: memoryLabel ? memoryLabel : undefined,
|
||||
sessionId: sessionId ? sessionId : undefined,
|
||||
chatId,
|
||||
memoryType,
|
||||
sessionId,
|
||||
createdDate: userMessageDateTime
|
||||
})
|
||||
}
|
||||
await this.addChatMessage(userMessage)
|
||||
|
||||
const apiMessage: any = {
|
||||
const apiMessage: Omit<IChatMessage, 'id' | 'createdDate'> = {
|
||||
role: 'apiMessage',
|
||||
content: result.text,
|
||||
chatflowid: chatflowid,
|
||||
chatflowid,
|
||||
chatType: isInternal ? chatType.INTERNAL : chatType.EXTERNAL,
|
||||
chatId: chatId,
|
||||
memoryType: memoryLabel ? memoryLabel : undefined,
|
||||
sessionId: sessionId ? sessionId : undefined
|
||||
chatId,
|
||||
memoryType,
|
||||
sessionId
|
||||
}
|
||||
if (result?.sourceDocuments) apiMessage.sourceDocuments = JSON.stringify(result.sourceDocuments)
|
||||
await this.addChatMessage(apiMessage)
|
||||
|
||||
logger.debug(`[server]: Finished running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`)
|
||||
if (incomingInput.chatId) result.chatId = chatId
|
||||
|
||||
// Only return ChatId when its Internal OR incoming input has ChatId, to avoid confusion when calling API
|
||||
if (incomingInput.chatId || isInternal) result.chatId = chatId
|
||||
|
||||
return res.json(result)
|
||||
} catch (e: any) {
|
||||
logger.error('[server]: Error:', e)
|
||||
@@ -1104,7 +1184,7 @@ export class App {
|
||||
* @param {string} chatflowid
|
||||
* @returns {string}
|
||||
*/
|
||||
export async function getChatId(chatflowid: string) {
|
||||
export async function getChatId(chatflowid: string): Promise<string> {
|
||||
// first chatmessage id as the unique chat id
|
||||
const firstChatMessage = await getDataSource()
|
||||
.getRepository(ChatMessage)
|
||||
|
||||
@@ -298,14 +298,14 @@ export const buildLangchain = async (
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear memory
|
||||
* Clear all session memories on the canvas
|
||||
* @param {IReactFlowNode[]} reactFlowNodes
|
||||
* @param {IComponentNodes} componentNodes
|
||||
* @param {string} chatId
|
||||
* @param {DataSource} appDataSource
|
||||
* @param {string} sessionId
|
||||
*/
|
||||
export const clearSessionMemory = async (
|
||||
export const clearAllSessionMemory = async (
|
||||
reactFlowNodes: IReactFlowNode[],
|
||||
componentNodes: IComponentNodes,
|
||||
chatId: string,
|
||||
@@ -317,9 +317,46 @@ export const clearSessionMemory = async (
|
||||
const nodeInstanceFilePath = componentNodes[node.data.name].filePath as string
|
||||
const nodeModule = await import(nodeInstanceFilePath)
|
||||
const newNodeInstance = new nodeModule.nodeClass()
|
||||
|
||||
if (sessionId && node.data.inputs) node.data.inputs.sessionId = sessionId
|
||||
if (newNodeInstance.clearSessionMemory)
|
||||
|
||||
if (newNodeInstance.clearSessionMemory) {
|
||||
await newNodeInstance?.clearSessionMemory(node.data, { chatId, appDataSource, databaseEntities, logger })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear specific session memory from View Message Dialog UI
|
||||
* @param {IReactFlowNode[]} reactFlowNodes
|
||||
* @param {IComponentNodes} componentNodes
|
||||
* @param {string} chatId
|
||||
* @param {DataSource} appDataSource
|
||||
* @param {string} sessionId
|
||||
* @param {string} memoryType
|
||||
*/
|
||||
export const clearSessionMemoryFromViewMessageDialog = async (
|
||||
reactFlowNodes: IReactFlowNode[],
|
||||
componentNodes: IComponentNodes,
|
||||
chatId: string,
|
||||
appDataSource: DataSource,
|
||||
sessionId?: string,
|
||||
memoryType?: string
|
||||
) => {
|
||||
if (!sessionId) return
|
||||
for (const node of reactFlowNodes) {
|
||||
if (node.data.category !== 'Memory') continue
|
||||
if (node.data.label !== memoryType) continue
|
||||
const nodeInstanceFilePath = componentNodes[node.data.name].filePath as string
|
||||
const nodeModule = await import(nodeInstanceFilePath)
|
||||
const newNodeInstance = new nodeModule.nodeClass()
|
||||
|
||||
if (sessionId && node.data.inputs) node.data.inputs.sessionId = sessionId
|
||||
|
||||
if (newNodeInstance.clearSessionMemory) {
|
||||
await newNodeInstance?.clearSessionMemory(node.data, { chatId, appDataSource, databaseEntities, logger })
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -933,5 +970,5 @@ export const checkMemorySessionId = (instance: any, chatId: string): string => {
|
||||
instance.memory.sessionId = chatId
|
||||
instance.memory.chatHistory.sessionId = chatId
|
||||
}
|
||||
return instance.memory.sessionId
|
||||
return instance.memory ? instance.memory.sessionId ?? instance.memory.chatHistory.sessionId : undefined
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user