feature: modularized express routes for reusability, testability, composability and performance (#2030)

* transition GET /api/v1/apikey

* transition POST /api/v1/apikey

* transition PUT /api/v1/apikey/:id

* transition DELETE /api/v1/apikey/:id

* Enable e2e tests for api/v1/apikey routes

* remove unused addChatflowsCount

* Enable e2e tests for api/v1/variables routes

* Enable Cypress in GitHub Action

* Update main.yml

* Update main.yml

* Transition GET /api/v1/variables

* Enable cypress on github workflow

* Transition POST /api/v1/variables

* Transition PUT /api/v1/variables

* Transition DELETE /api/v1/variables

* Transition GET /api/v1/variables

* Transition GET /api/v1/chatflows

* Transition GET /api/v1/chatflows/:id

* Transition POST /api/v1/chatflows

* Transition DELETE /api/v1/chatflows/:id

* Transition PUT /api/v1/chatflows/:id

* Transition GET /api/v1/chatflows/apikey/:apiKey

* Transition GET /api/v1/credentials

* Transition POST /api/v1/credentials

* Transition GET /api/v1/credentials/:id

* Transition PUT /api/v1/credentials/:id

* Transition DELETE /api/v1/credentials/:id

* Transition GET /api/v1/tools

* Transition GET /api/v1/tools/:id

* Transition POST /api/v1/tools

* Transition PUT & DELETE /api/v1/tools/:id

* Transition /api/v1/assistants routes

* Transition /api/v1/nodes routes

* Transition GET /api/v1/chatflows-streaming/:id & GET /api/v1/chatflows-uploads/:id

* wip-all-routes

* Transition GET /api/v1/public-chatflows/:id & /api/v1/public-chatbotConfig/:id

* Remove ts-ignore annotations

* Transition GET /api/v1/chatmessage/:id

* Transition POST /api/v1/chatmessage/:id

* delete /api/v1/chatmessage/:id

* transition /api/v1/feedback/:id routes

* transition /api/v1/stats/:id

* Transition GET /api/v1/openai-assistants/:id

* Transition GET /api/v1/openai-assistants

* Transition POST /api/v1/openai-assistants-file

* transition GET /api/v1/get-upload-path

* transition GET /api/v1/get-upload-file

* transition GET /api/v1/flow-config/:id

* transition POST /api/v1/node-config

* transition GET /api/v1/version

* transition GET /api/v1/fetch-links

* transition POST /api/v1/vector/upsert/:id

* transition POST /api/v1/vector/internal-upsert/:id

* transition POST /api/v1/load-prompt

* Update index.ts

* transition POST /api/v1/prompts-list

* transition predictions

* Update index.ts

* transition GET /api/v1/marketplaces/templates

* Router update modularity cleanup

* extend request interface - express namespace

* Update index.ts

* add errorMiddleware

* Add custom application error handler

* Fix pnpm lock file

* prediction return and vector upsert

* Move the getUploadsConfig into its own file

* Remove lint warnings

* fix undefined variable value

* Fix node-load-method api call

* standardize the error message display

* Apply review comment bugfixes

* Update index.ts

* standardize error message display  in snack notifications

* Error message standard in the UI

* Rename flowXpressApp to appServer

* Upload middleware fix and axios update

* fix async await

---------

Co-authored-by: Henry <hzj94@hotmail.com>
This commit is contained in:
Octavian FlowiseAI
2024-04-02 17:44:04 +02:00
committed by GitHub
parent ea255db15d
commit 957694a912
136 changed files with 5347 additions and 2380 deletions
@@ -0,0 +1,19 @@
import { ChatMessage } from '../database/entities/ChatMessage'
import { IChatMessage } from '../Interface'
import { getRunningExpressApp } from '../utils/getRunningExpressApp'
/**
* Method that add chat messages.
* @param {Partial<IChatMessage>} chatMessage
*/
export const utilAddChatMessage = async (chatMessage: Partial<IChatMessage>): Promise<ChatMessage> => {
const appServer = getRunningExpressApp()
const newChatMessage = new ChatMessage()
Object.assign(newChatMessage, chatMessage)
if (!newChatMessage.createdDate) {
newChatMessage.createdDate = new Date()
}
const chatmessage = await appServer.AppDataSource.getRepository(ChatMessage).create(newChatMessage)
const dbResponse = await appServer.AppDataSource.getRepository(ChatMessage).save(chatmessage)
return dbResponse
}
@@ -0,0 +1,16 @@
import { ChatMessageFeedback } from '../database/entities/ChatMessageFeedback'
import { IChatMessageFeedback } from '../Interface'
import { getRunningExpressApp } from '../utils/getRunningExpressApp'
/**
* Method that add chat message feedback.
* @param {Partial<IChatMessageFeedback>} chatMessageFeedback
*/
export const utilAddChatMessageFeedback = async (chatMessageFeedback: Partial<IChatMessageFeedback>): Promise<ChatMessageFeedback> => {
const appServer = getRunningExpressApp()
const newChatMessageFeedback = new ChatMessageFeedback()
Object.assign(newChatMessageFeedback, chatMessageFeedback)
const feedback = await appServer.AppDataSource.getRepository(ChatMessageFeedback).create(newChatMessageFeedback)
return await appServer.AppDataSource.getRepository(ChatMessageFeedback).save(feedback)
}
@@ -0,0 +1,33 @@
import { ChatFlow } from '../database/entities/ChatFlow'
import { getRunningExpressApp } from '../utils/getRunningExpressApp'
export const addChatflowsCount = async (keys: any) => {
try {
const appServer = getRunningExpressApp()
let tmpResult = keys
if (typeof keys !== 'undefined' && keys.length > 0) {
const updatedKeys: any[] = []
//iterate through keys and get chatflows
for (const key of keys) {
const chatflows = await appServer.AppDataSource.getRepository(ChatFlow)
.createQueryBuilder('cf')
.where('cf.apikeyid = :apikeyid', { apikeyid: key.id })
.getMany()
const linkedChatFlows: any[] = []
chatflows.map((cf) => {
linkedChatFlows.push({
flowName: cf.name,
category: cf.category,
updatedDate: cf.updatedDate
})
})
key.chatFlows = linkedChatFlows
updatedKeys.push(key)
}
tmpResult = updatedKeys
}
return tmpResult
} catch (error) {
throw new Error(`Error: addChatflowsCount - ${error}`)
}
}
+427
View File
@@ -0,0 +1,427 @@
import { Request } from 'express'
import { IFileUpload, getStoragePath, convertSpeechToText, ICommonObject } from 'flowise-components'
import { StatusCodes } from 'http-status-codes'
import { IncomingInput, IMessage, INodeData, IReactFlowObject, IReactFlowNode, IDepthQueue, chatType, IChatMessage } from '../Interface'
import path from 'path'
import { ChatFlow } from '../database/entities/ChatFlow'
import { Server } from 'socket.io'
import { getRunningExpressApp } from '../utils/getRunningExpressApp'
import {
mapMimeTypeToInputField,
isFlowValidForStream,
buildFlow,
getTelemetryFlowObj,
getAppVersion,
resolveVariables,
getSessionChatHistory,
findMemoryNode,
replaceInputsWithConfig,
getStartingNodes,
isStartNodeDependOnInput,
getMemorySessionId,
isSameOverrideConfig,
getEndingNodes,
constructGraphs
} from '../utils'
import { utilValidateKey } from './validateKey'
import { databaseEntities } from '.'
import { v4 as uuidv4 } from 'uuid'
import { omit } from 'lodash'
import * as fs from 'fs'
import logger from './logger'
import { utilAddChatMessage } from './addChatMesage'
/**
* Build Chatflow
* @param {Request} req
* @param {Server} socketIO
* @param {boolean} isInternal
* @param {boolean} isUpsert
*/
export const utilBuildChatflow = async (req: Request, socketIO?: Server, isInternal: boolean = false): Promise<any> => {
try {
const appServer = getRunningExpressApp()
const chatflowid = req.params.id
let incomingInput: IncomingInput = req.body
let nodeToExecuteData: INodeData
const chatflow = await appServer.AppDataSource.getRepository(ChatFlow).findOneBy({
id: chatflowid
})
if (!chatflow) {
return {
executionError: true,
status: StatusCodes.NOT_FOUND,
msg: `Chatflow ${chatflowid} not found`
}
}
const chatId = incomingInput.chatId ?? incomingInput.overrideConfig?.sessionId ?? uuidv4()
const userMessageDateTime = new Date()
if (!isInternal) {
const isKeyValidated = await utilValidateKey(req, chatflow)
if (!isKeyValidated) {
return {
executionError: true,
status: StatusCodes.UNAUTHORIZED,
msg: `Unauthorized`
}
}
}
let fileUploads: IFileUpload[] = []
if (incomingInput.uploads) {
fileUploads = incomingInput.uploads
for (let i = 0; i < fileUploads.length; i += 1) {
const upload = fileUploads[i]
if ((upload.type === 'file' || upload.type === 'audio') && upload.data) {
const filename = upload.name
const dir = path.join(getStoragePath(), chatflowid, chatId)
if (!fs.existsSync(dir)) {
fs.mkdirSync(dir, { recursive: true })
}
const filePath = path.join(dir, filename)
const splitDataURI = upload.data.split(',')
const bf = Buffer.from(splitDataURI.pop() || '', 'base64')
fs.writeFileSync(filePath, bf)
// Omit upload.data since we don't store the content in database
upload.type = 'stored-file'
fileUploads[i] = omit(upload, ['data'])
}
// Run Speech to Text conversion
if (upload.mime === 'audio/webm') {
let speechToTextConfig: ICommonObject = {}
if (chatflow.speechToText) {
const speechToTextProviders = JSON.parse(chatflow.speechToText)
for (const provider in speechToTextProviders) {
const providerObj = speechToTextProviders[provider]
if (providerObj.status) {
speechToTextConfig = providerObj
speechToTextConfig['name'] = provider
break
}
}
}
if (speechToTextConfig) {
const options: ICommonObject = {
chatId,
chatflowid,
appDataSource: appServer.AppDataSource,
databaseEntities: databaseEntities
}
const speechToTextResult = await convertSpeechToText(upload, speechToTextConfig, options)
if (speechToTextResult) {
incomingInput.question = speechToTextResult
}
}
}
}
}
let isStreamValid = false
const files = (req.files as any[]) || []
if (files.length) {
const overrideConfig: ICommonObject = { ...req.body }
for (const file of files) {
const fileData = fs.readFileSync(file.path, { encoding: 'base64' })
const dataBase64String = `data:${file.mimetype};base64,${fileData},filename:${file.filename}`
const fileInputField = mapMimeTypeToInputField(file.mimetype)
if (overrideConfig[fileInputField]) {
overrideConfig[fileInputField] = JSON.stringify([...JSON.parse(overrideConfig[fileInputField]), dataBase64String])
} else {
overrideConfig[fileInputField] = JSON.stringify([dataBase64String])
}
}
incomingInput = {
question: req.body.question ?? 'hello',
overrideConfig,
history: [],
socketIOClientId: req.body.socketIOClientId
}
}
/*** Get chatflows and prepare data ***/
const flowData = chatflow.flowData
const parsedFlowData: IReactFlowObject = JSON.parse(flowData)
const nodes = parsedFlowData.nodes
const edges = parsedFlowData.edges
// Get session ID
const memoryNode = findMemoryNode(nodes, edges)
const memoryType = memoryNode?.data.label
let sessionId = undefined
if (memoryNode) sessionId = getMemorySessionId(memoryNode, incomingInput, chatId, isInternal)
/* Reuse the flow without having to rebuild (to avoid duplicated upsert, recomputation, reinitialization of memory) when all these conditions met:
* - Node Data already exists in pool
* - Still in sync (i.e the flow has not been modified since)
* - Existing overrideConfig and new overrideConfig are the same
* - Flow doesn't start with/contain nodes that depend on incomingInput.question
* TODO: convert overrideConfig to hash when we no longer store base64 string but filepath
***/
const isFlowReusable = () => {
return (
Object.prototype.hasOwnProperty.call(appServer.chatflowPool.activeChatflows, chatflowid) &&
appServer.chatflowPool.activeChatflows[chatflowid].inSync &&
appServer.chatflowPool.activeChatflows[chatflowid].endingNodeData &&
isSameOverrideConfig(
isInternal,
appServer.chatflowPool.activeChatflows[chatflowid].overrideConfig,
incomingInput.overrideConfig
) &&
!isStartNodeDependOnInput(appServer.chatflowPool.activeChatflows[chatflowid].startingNodes, nodes)
)
}
if (isFlowReusable()) {
nodeToExecuteData = appServer.chatflowPool.activeChatflows[chatflowid].endingNodeData as INodeData
isStreamValid = isFlowValidForStream(nodes, nodeToExecuteData)
logger.debug(
`[server]: Reuse existing chatflow ${chatflowid} with ending node ${nodeToExecuteData.label} (${nodeToExecuteData.id})`
)
} else {
/*** Get Ending Node with Directed Graph ***/
const { graph, nodeDependencies } = constructGraphs(nodes, edges)
const directedGraph = graph
const endingNodeIds = getEndingNodes(nodeDependencies, directedGraph)
if (!endingNodeIds.length) {
return {
executionError: true,
status: 500,
msg: `Ending nodes not found`
}
}
const endingNodes = nodes.filter((nd) => endingNodeIds.includes(nd.id))
let isEndingNodeExists = endingNodes.find((node) => node.data?.outputs?.output === 'EndingNode')
for (const endingNode of endingNodes) {
const endingNodeData = endingNode.data
if (!endingNodeData) {
return {
executionError: true,
status: 500,
msg: `Ending node ${endingNode.id} data not found`
}
}
const isEndingNode = endingNodeData?.outputs?.output === 'EndingNode'
if (!isEndingNode) {
if (
endingNodeData &&
endingNodeData.category !== 'Chains' &&
endingNodeData.category !== 'Agents' &&
endingNodeData.category !== 'Engine'
) {
return {
executionError: true,
status: 500,
msg: `Ending node must be either a Chain or Agent`
}
}
if (
endingNodeData.outputs &&
Object.keys(endingNodeData.outputs).length &&
!Object.values(endingNodeData.outputs ?? {}).includes(endingNodeData.name)
) {
return {
executionError: true,
status: 500,
msg: `Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction`
}
}
}
isStreamValid = isFlowValidForStream(nodes, endingNodeData)
}
// Once custom function ending node exists, flow is always unavailable to stream
isStreamValid = isEndingNodeExists ? false : isStreamValid
let chatHistory: IMessage[] = incomingInput.history ?? []
// When {{chat_history}} is used in Prompt Template, fetch the chat conversations from memory node
for (const endingNode of endingNodes) {
const endingNodeData = endingNode.data
if (!endingNodeData.inputs?.memory) continue
const memoryNodeId = endingNodeData.inputs?.memory.split('.')[0].replace('{{', '')
const memoryNode = nodes.find((node) => node.data.id === memoryNodeId)
if (!memoryNode) continue
if (!chatHistory.length && (incomingInput.chatId || incomingInput.overrideConfig?.sessionId)) {
chatHistory = await getSessionChatHistory(
memoryNode,
appServer.nodesPool.componentNodes,
incomingInput,
appServer.AppDataSource,
databaseEntities,
logger
)
}
}
/*** Get Starting Nodes with Reversed Graph ***/
const constructedObj = constructGraphs(nodes, edges, { isReversed: true })
const nonDirectedGraph = constructedObj.graph
let startingNodeIds: string[] = []
let depthQueue: IDepthQueue = {}
for (const endingNodeId of endingNodeIds) {
const resx = getStartingNodes(nonDirectedGraph, endingNodeId)
startingNodeIds.push(...resx.startingNodeIds)
depthQueue = Object.assign(depthQueue, resx.depthQueue)
}
startingNodeIds = [...new Set(startingNodeIds)]
const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.id))
logger.debug(`[server]: Start building chatflow ${chatflowid}`)
/*** BFS to traverse from Starting Nodes to Ending Node ***/
const reactFlowNodes = await buildFlow(
startingNodeIds,
nodes,
edges,
graph,
depthQueue,
appServer.nodesPool.componentNodes,
incomingInput.question,
chatHistory,
chatId,
sessionId ?? '',
chatflowid,
appServer.AppDataSource,
incomingInput?.overrideConfig,
appServer.cachePool,
false,
undefined,
incomingInput.uploads
)
const nodeToExecute =
endingNodeIds.length === 1
? reactFlowNodes.find((node: IReactFlowNode) => endingNodeIds[0] === node.id)
: reactFlowNodes[reactFlowNodes.length - 1]
if (!nodeToExecute) {
return {
executionError: true,
status: 404,
msg: `Node not found`
}
}
if (incomingInput.overrideConfig) {
nodeToExecute.data = replaceInputsWithConfig(nodeToExecute.data, incomingInput.overrideConfig)
}
const reactFlowNodeData: INodeData = resolveVariables(nodeToExecute.data, reactFlowNodes, incomingInput.question, chatHistory)
nodeToExecuteData = reactFlowNodeData
appServer.chatflowPool.add(chatflowid, nodeToExecuteData, startingNodes, incomingInput?.overrideConfig)
}
logger.debug(`[server]: Running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`)
const nodeInstanceFilePath = appServer.nodesPool.componentNodes[nodeToExecuteData.name].filePath as string
const nodeModule = await import(nodeInstanceFilePath)
const nodeInstance = new nodeModule.nodeClass({ sessionId })
let result = isStreamValid
? await nodeInstance.run(nodeToExecuteData, incomingInput.question, {
chatId,
chatflowid,
chatHistory: incomingInput.history,
logger,
appDataSource: appServer.AppDataSource,
databaseEntities,
analytic: chatflow.analytic,
uploads: incomingInput.uploads,
socketIO,
socketIOClientId: incomingInput.socketIOClientId
})
: await nodeInstance.run(nodeToExecuteData, incomingInput.question, {
chatId,
chatflowid,
chatHistory: incomingInput.history,
logger,
appDataSource: appServer.AppDataSource,
databaseEntities,
analytic: chatflow.analytic,
uploads: incomingInput.uploads
})
result = typeof result === 'string' ? { text: result } : result
// Retrieve threadId from assistant if exists
if (typeof result === 'object' && result.assistant) {
sessionId = result.assistant.threadId
}
const userMessage: Omit<IChatMessage, 'id'> = {
role: 'userMessage',
content: incomingInput.question,
chatflowid,
chatType: isInternal ? chatType.INTERNAL : chatType.EXTERNAL,
chatId,
memoryType,
sessionId,
createdDate: userMessageDateTime,
fileUploads: incomingInput.uploads ? JSON.stringify(fileUploads) : undefined
}
await utilAddChatMessage(userMessage)
let resultText = ''
if (result.text) resultText = result.text
else if (result.json) resultText = '```json\n' + JSON.stringify(result.json, null, 2)
else resultText = JSON.stringify(result, null, 2)
const apiMessage: Omit<IChatMessage, 'id' | 'createdDate'> = {
role: 'apiMessage',
content: resultText,
chatflowid,
chatType: isInternal ? chatType.INTERNAL : chatType.EXTERNAL,
chatId,
memoryType,
sessionId
}
if (result?.sourceDocuments) apiMessage.sourceDocuments = JSON.stringify(result.sourceDocuments)
if (result?.usedTools) apiMessage.usedTools = JSON.stringify(result.usedTools)
if (result?.fileAnnotations) apiMessage.fileAnnotations = JSON.stringify(result.fileAnnotations)
const chatMessage = await utilAddChatMessage(apiMessage)
logger.debug(`[server]: Finished running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`)
await appServer.telemetry.sendTelemetry('prediction_sent', {
version: await getAppVersion(),
chatflowId: chatflowid,
chatId,
type: isInternal ? chatType.INTERNAL : chatType.EXTERNAL,
flowGraph: getTelemetryFlowObj(nodes, edges)
})
// Prepare response
// return the question in the response
// this is used when input text is empty but question is in audio format
result.question = incomingInput.question
result.chatId = chatId
result.chatMessageId = chatMessage.id
if (sessionId) result.sessionId = sessionId
if (memoryType) result.memoryType = memoryType
return result
} catch (e: any) {
logger.error('[server]: Error:', e)
return {
executionError: true,
status: 500,
msg: e.message
}
}
}
+101
View File
@@ -0,0 +1,101 @@
import { MoreThanOrEqual, LessThanOrEqual } from 'typeorm'
import { chatType } from '../Interface'
import { ChatMessage } from '../database/entities/ChatMessage'
import { ChatMessageFeedback } from '../database/entities/ChatMessageFeedback'
import { getRunningExpressApp } from '../utils/getRunningExpressApp'
/**
* Method that get chat messages.
* @param {string} chatflowid
* @param {chatType} chatType
* @param {string} sortOrder
* @param {string} chatId
* @param {string} memoryType
* @param {string} sessionId
* @param {string} startDate
* @param {string} endDate
* @param {boolean} feedback
*/
export const utilGetChatMessage = async (
chatflowid: string,
chatType: chatType | undefined,
sortOrder: string = 'ASC',
chatId?: string,
memoryType?: string,
sessionId?: string,
startDate?: string,
endDate?: string,
messageId?: string,
feedback?: boolean
): Promise<ChatMessage[]> => {
const appServer = getRunningExpressApp()
const setDateToStartOrEndOfDay = (dateTimeStr: string, setHours: 'start' | 'end') => {
const date = new Date(dateTimeStr)
if (isNaN(date.getTime())) {
return undefined
}
setHours === 'start' ? date.setHours(0, 0, 0, 0) : date.setHours(23, 59, 59, 999)
return date
}
const aMonthAgo = () => {
const date = new Date()
date.setMonth(new Date().getMonth() - 1)
return date
}
let fromDate
if (startDate) fromDate = setDateToStartOrEndOfDay(startDate, 'start')
let toDate
if (endDate) toDate = setDateToStartOrEndOfDay(endDate, 'end')
if (feedback) {
const query = await appServer.AppDataSource.getRepository(ChatMessage).createQueryBuilder('chat_message')
// do the join with chat message feedback based on messageId for each chat message in the chatflow
query
.leftJoinAndMapOne('chat_message.feedback', ChatMessageFeedback, 'feedback', 'feedback.messageId = chat_message.id')
.where('chat_message.chatflowid = :chatflowid', { chatflowid })
// based on which parameters are available add `andWhere` clauses to the query
if (chatType) {
query.andWhere('chat_message.chatType = :chatType', { chatType })
}
if (chatId) {
query.andWhere('chat_message.chatId = :chatId', { chatId })
}
if (memoryType) {
query.andWhere('chat_message.memoryType = :memoryType', { memoryType })
}
if (sessionId) {
query.andWhere('chat_message.sessionId = :sessionId', { sessionId })
}
// set date range
query.andWhere('chat_message.createdDate BETWEEN :fromDate AND :toDate', {
fromDate: fromDate ?? aMonthAgo(),
toDate: toDate ?? new Date()
})
// sort
query.orderBy('chat_message.createdDate', sortOrder === 'DESC' ? 'DESC' : 'ASC')
const messages = await query.getMany()
return messages
}
return await appServer.AppDataSource.getRepository(ChatMessage).find({
where: {
chatflowid,
chatType,
chatId,
memoryType: memoryType ?? undefined,
sessionId: sessionId ?? undefined,
...(fromDate && { createdDate: MoreThanOrEqual(fromDate) }),
...(toDate && { createdDate: LessThanOrEqual(toDate) }),
id: messageId ?? undefined
},
order: {
createdDate: sortOrder === 'DESC' ? 'DESC' : 'ASC'
}
})
}
@@ -0,0 +1,36 @@
import { Between } from 'typeorm'
import { ChatMessageFeedback } from '../database/entities/ChatMessageFeedback'
import { getRunningExpressApp } from '../utils/getRunningExpressApp'
/**
* Method that get chat messages.
* @param {string} chatflowid
* @param {string} sortOrder
* @param {string} chatId
* @param {string} startDate
* @param {string} endDate
*/
export const utilGetChatMessageFeedback = async (
chatflowid: string,
chatId?: string,
sortOrder: string = 'ASC',
startDate?: string,
endDate?: string
): Promise<ChatMessageFeedback[]> => {
const appServer = getRunningExpressApp()
let fromDate
if (startDate) fromDate = new Date(startDate)
let toDate
if (endDate) toDate = new Date(endDate)
return await appServer.AppDataSource.getRepository(ChatMessageFeedback).find({
where: {
chatflowid,
chatId,
createdDate: toDate && fromDate ? Between(fromDate, toDate) : undefined
},
order: {
createdDate: sortOrder === 'DESC' ? 'DESC' : 'ASC'
}
})
}
@@ -0,0 +1,9 @@
import * as Server from '../index'
export const getRunningExpressApp = function () {
const runningExpressInstance = Server.getInstance()
if (typeof runningExpressInstance === 'undefined' || typeof runningExpressInstance.nodesPool === 'undefined') {
throw new Error(`Error: getRunningExpressApp failed!`)
}
return runningExpressInstance
}
@@ -0,0 +1,72 @@
import { ChatFlow } from '../database/entities/ChatFlow'
import { getRunningExpressApp } from '../utils/getRunningExpressApp'
import { IUploadFileSizeAndTypes, IReactFlowNode } from '../Interface'
import { INodeParams } from 'flowise-components'
/**
* Method that checks if uploads are enabled in the chatflow
* @param {string} chatflowid
*/
export const utilGetUploadsConfig = async (chatflowid: string): Promise<any> => {
const appServer = getRunningExpressApp()
const chatflow = await appServer.AppDataSource.getRepository(ChatFlow).findOneBy({
id: chatflowid
})
if (!chatflow) return `Chatflow ${chatflowid} not found`
const uploadAllowedNodes = ['llmChain', 'conversationChain', 'mrklAgentChat', 'conversationalAgent']
const uploadProcessingNodes = ['chatOpenAI', 'chatAnthropic', 'awsChatBedrock', 'azureChatOpenAI']
const flowObj = JSON.parse(chatflow.flowData)
const imgUploadSizeAndTypes: IUploadFileSizeAndTypes[] = []
let isSpeechToTextEnabled = false
if (chatflow.speechToText) {
const speechToTextProviders = JSON.parse(chatflow.speechToText)
for (const provider in speechToTextProviders) {
if (provider !== 'none') {
const providerObj = speechToTextProviders[provider]
if (providerObj.status) {
isSpeechToTextEnabled = true
break
}
}
}
}
let isImageUploadAllowed = false
const nodes: IReactFlowNode[] = flowObj.nodes
/*
* Condition for isImageUploadAllowed
* 1.) one of the uploadAllowedNodes exists
* 2.) one of the uploadProcessingNodes exists + allowImageUploads is ON
*/
if (!nodes.some((node) => uploadAllowedNodes.includes(node.data.name))) {
return {
isSpeechToTextEnabled,
isImageUploadAllowed: false,
imgUploadSizeAndTypes
}
}
nodes.forEach((node: IReactFlowNode) => {
if (uploadProcessingNodes.indexOf(node.data.name) > -1) {
// TODO: for now the maxUploadSize is hardcoded to 5MB, we need to add it to the node properties
node.data.inputParams.map((param: INodeParams) => {
if (param.name === 'allowImageUploads' && node.data.inputs?.['allowImageUploads']) {
imgUploadSizeAndTypes.push({
fileTypes: 'image/gif;image/jpeg;image/png;image/webp;'.split(';'),
maxUploadSize: 5
})
isImageUploadAllowed = true
}
})
}
})
return {
isSpeechToTextEnabled,
isImageUploadAllowed,
imgUploadSizeAndTypes
}
}
+1 -1
View File
@@ -492,7 +492,7 @@ export const getVariableValue = (
isAcceptVariable = false
) => {
const isObject = typeof paramValue === 'object'
let returnVal = isObject ? JSON.stringify(paramValue) : paramValue
let returnVal = (isObject ? JSON.stringify(paramValue) : paramValue) ?? ''
const variableStack = []
const variableDict = {} as IVariableDict
let startIdx = 0
-3
View File
@@ -23,11 +23,8 @@ async function addRateLimiter(id: string, duration: number, limit: number, messa
export function getRateLimiter(req: Request, res: Response, next: NextFunction) {
const id = req.params.id
if (!rateLimiters[id]) return next()
const idRateLimiter = rateLimiters[id]
return idRateLimiter(req, res, next)
}
@@ -0,0 +1,16 @@
import { IChatMessageFeedback } from '../Interface'
import { getRunningExpressApp } from '../utils/getRunningExpressApp'
import { ChatMessageFeedback } from '../database/entities/ChatMessageFeedback'
/**
* Method that updates chat message feedback.
* @param {string} id
* @param {Partial<IChatMessageFeedback>} chatMessageFeedback
*/
export const utilUpdateChatMessageFeedback = async (id: string, chatMessageFeedback: Partial<IChatMessageFeedback>) => {
const appServer = getRunningExpressApp()
const newChatMessageFeedback = new ChatMessageFeedback()
Object.assign(newChatMessageFeedback, chatMessageFeedback)
await appServer.AppDataSource.getRepository(ChatMessageFeedback).update({ id }, chatMessageFeedback)
return { status: 'OK' }
}
+139
View File
@@ -0,0 +1,139 @@
import { Request, Response } from 'express'
import * as fs from 'fs'
import { ICommonObject } from 'flowise-components'
import telemetryService from '../services/telemetry'
import logger from '../utils/logger'
import {
buildFlow,
constructGraphs,
getAllConnectedNodes,
mapMimeTypeToInputField,
findMemoryNode,
getMemorySessionId,
getAppVersion,
getTelemetryFlowObj,
getStartingNodes
} from '../utils'
import { utilValidateKey } from './validateKey'
import { IncomingInput, INodeDirectedGraph, IReactFlowObject, chatType } from '../Interface'
import { ChatFlow } from '../database/entities/ChatFlow'
import { getRunningExpressApp } from '../utils/getRunningExpressApp'
export const upsertVector = async (req: Request, res: Response, isInternal: boolean = false) => {
try {
const appServer = getRunningExpressApp()
const chatflowid = req.params.id
let incomingInput: IncomingInput = req.body
const chatflow = await appServer.AppDataSource.getRepository(ChatFlow).findOneBy({
id: chatflowid
})
if (!chatflow) return res.status(404).send(`Chatflow ${chatflowid} not found`)
if (!isInternal) {
const isKeyValidated = await utilValidateKey(req, chatflow)
if (!isKeyValidated) return res.status(401).send('Unauthorized')
}
const files = (req.files as any[]) || []
if (files.length) {
const overrideConfig: ICommonObject = { ...req.body }
for (const file of files) {
const fileData = fs.readFileSync(file.path, { encoding: 'base64' })
const dataBase64String = `data:${file.mimetype};base64,${fileData},filename:${file.filename}`
const fileInputField = mapMimeTypeToInputField(file.mimetype)
if (overrideConfig[fileInputField]) {
overrideConfig[fileInputField] = JSON.stringify([...JSON.parse(overrideConfig[fileInputField]), dataBase64String])
} else {
overrideConfig[fileInputField] = JSON.stringify([dataBase64String])
}
}
incomingInput = {
question: req.body.question ?? 'hello',
overrideConfig,
history: [],
stopNodeId: req.body.stopNodeId
}
}
/*** Get chatflows and prepare data ***/
const flowData = chatflow.flowData
const parsedFlowData: IReactFlowObject = JSON.parse(flowData)
const nodes = parsedFlowData.nodes
const edges = parsedFlowData.edges
let stopNodeId = incomingInput?.stopNodeId ?? ''
let chatHistory = incomingInput?.history
let chatId = incomingInput.chatId ?? ''
let isUpsert = true
// Get session ID
const memoryNode = findMemoryNode(nodes, edges)
let sessionId = undefined
if (memoryNode) sessionId = getMemorySessionId(memoryNode, incomingInput, chatId, isInternal)
const vsNodes = nodes.filter(
(node) =>
node.data.category === 'Vector Stores' && !node.data.label.includes('Upsert') && !node.data.label.includes('Load Existing')
)
if (vsNodes.length > 1 && !stopNodeId) {
return res.status(500).send('There are multiple vector nodes, please provide stopNodeId in body request')
} else if (vsNodes.length === 1 && !stopNodeId) {
stopNodeId = vsNodes[0].data.id
} else if (!vsNodes.length && !stopNodeId) {
return res.status(500).send('No vector node found')
}
const { graph } = constructGraphs(nodes, edges, { isReversed: true })
const nodeIds = getAllConnectedNodes(graph, stopNodeId)
const filteredGraph: INodeDirectedGraph = {}
for (const key of nodeIds) {
if (Object.prototype.hasOwnProperty.call(graph, key)) {
filteredGraph[key] = graph[key]
}
}
const { startingNodeIds, depthQueue } = getStartingNodes(filteredGraph, stopNodeId)
await buildFlow(
startingNodeIds,
nodes,
edges,
filteredGraph,
depthQueue,
appServer.nodesPool.componentNodes,
incomingInput.question,
chatHistory,
chatId,
sessionId ?? '',
chatflowid,
appServer.AppDataSource,
incomingInput?.overrideConfig,
appServer.cachePool,
isUpsert,
stopNodeId
)
const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.data.id))
await appServer.chatflowPool.add(chatflowid, undefined, startingNodes, incomingInput?.overrideConfig)
await telemetryService.createEvent({
name: `vector_upserted`,
data: {
version: await getAppVersion(),
chatlowId: chatflowid,
type: isInternal ? chatType.INTERNAL : chatType.EXTERNAL,
flowGraph: getTelemetryFlowObj(nodes, edges),
stopNodeId
}
})
return res.status(201).send('Successfully Upserted')
} catch (e: any) {
logger.error('[server]: Error:', e)
return res.status(500).send(e.message)
}
}
+26
View File
@@ -0,0 +1,26 @@
import { Request } from 'express'
import { ChatFlow } from '../database/entities/ChatFlow'
import { getAPIKeys, compareKeys } from './apiKey'
/**
* Validate API Key
* @param {Request} req
* @param {Response} res
* @param {ChatFlow} chatflow
*/
export const utilValidateKey = async (req: Request, chatflow: ChatFlow) => {
const chatFlowApiKeyId = chatflow.apikeyid
if (!chatFlowApiKeyId) return true
const authorizationHeader = (req.headers['Authorization'] as string) ?? (req.headers['authorization'] as string) ?? ''
if (chatFlowApiKeyId && !authorizationHeader) return false
const suppliedKey = authorizationHeader.split(`Bearer `).pop()
if (suppliedKey) {
const keys = await getAPIKeys()
const apiSecret = keys.find((key) => key.id === chatFlowApiKeyId)?.apiSecret
if (!compareKeys(apiSecret, suppliedKey)) return false
return true
}
return false
}