Merge branch 'main' into feature/LlamaIndex

# Conflicts:
#	packages/components/nodes/agents/ConversationalAgent/ConversationalAgent.ts
#	packages/components/nodes/agents/ConversationalRetrievalAgent/ConversationalRetrievalAgent.ts
#	packages/components/nodes/agents/OpenAIAssistant/OpenAIAssistant.ts
#	packages/components/nodes/agents/OpenAIFunctionAgent/OpenAIFunctionAgent.ts
#	packages/components/nodes/chains/ConversationChain/ConversationChain.ts
#	packages/components/nodes/chains/ConversationalRetrievalQAChain/ConversationalRetrievalQAChain.ts
#	packages/components/nodes/memory/BufferMemory/BufferMemory.ts
#	packages/components/nodes/memory/BufferWindowMemory/BufferWindowMemory.ts
#	packages/components/nodes/memory/ConversationSummaryMemory/ConversationSummaryMemory.ts
#	packages/components/nodes/memory/DynamoDb/DynamoDb.ts
#	packages/components/nodes/memory/MongoDBMemory/MongoDBMemory.ts
#	packages/components/nodes/memory/MotorheadMemory/MotorheadMemory.ts
#	packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts
#	packages/components/nodes/memory/UpstashRedisBackedChatMemory/UpstashRedisBackedChatMemory.ts
#	packages/components/nodes/memory/ZepMemory/ZepMemory.ts
#	packages/components/src/utils.ts
#	packages/server/marketplaces/chatflows/Long Term Memory.json
#	packages/server/src/index.ts
#	packages/server/src/utils/index.ts
This commit is contained in:
Henry
2024-01-24 03:00:31 +00:00
468 changed files with 13562 additions and 3588 deletions
+1 -1
View File
@@ -16,7 +16,7 @@ export class ChatflowPool {
* @param {IReactFlowNode[]} startingNodes
* @param {ICommonObject} overrideConfig
*/
add(chatflowid: string, endingNodeData: INodeData, startingNodes: IReactFlowNode[], overrideConfig?: ICommonObject) {
add(chatflowid: string, endingNodeData: INodeData | undefined, startingNodes: IReactFlowNode[], overrideConfig?: ICommonObject) {
this.activeChatflows[chatflowid] = {
startingNodes,
endingNodeData,
+1
View File
@@ -46,6 +46,7 @@ export const init = async (): Promise<void> => {
username: process.env.DATABASE_USER,
password: process.env.DATABASE_PASSWORD,
database: process.env.DATABASE_NAME,
ssl: process.env.DATABASE_SSL === 'true',
synchronize: false,
migrationsRun: false,
entities: Object.values(entities),
+10 -1
View File
@@ -68,6 +68,15 @@ export interface ICredential {
createdDate: Date
}
export interface IVariable {
id: string
name: string
value: string
type: string
updatedDate: Date
createdDate: Date
}
export interface IComponentNodes {
[key: string]: INode
}
@@ -172,7 +181,7 @@ export interface IncomingInput {
export interface IActiveChatflows {
[key: string]: {
startingNodes: IReactFlowNode[]
endingNodeData: INodeData
endingNodeData?: INodeData
inSync: boolean
overrideConfig?: ICommonObject
}
+7 -1
View File
@@ -35,10 +35,12 @@ export default class Start extends Command {
DATABASE_NAME: Flags.string(),
DATABASE_USER: Flags.string(),
DATABASE_PASSWORD: Flags.string(),
DATABASE_SSL: Flags.string(),
LANGCHAIN_TRACING_V2: Flags.string(),
LANGCHAIN_ENDPOINT: Flags.string(),
LANGCHAIN_API_KEY: Flags.string(),
LANGCHAIN_PROJECT: Flags.string()
LANGCHAIN_PROJECT: Flags.string(),
DISABLE_FLOWISE_TELEMETRY: Flags.string()
}
async stopProcess() {
@@ -104,6 +106,7 @@ export default class Start extends Command {
if (flags.DATABASE_NAME) process.env.DATABASE_NAME = flags.DATABASE_NAME
if (flags.DATABASE_USER) process.env.DATABASE_USER = flags.DATABASE_USER
if (flags.DATABASE_PASSWORD) process.env.DATABASE_PASSWORD = flags.DATABASE_PASSWORD
if (flags.DATABASE_SSL) process.env.DATABASE_SSL = flags.DATABASE_SSL
// Langsmith tracing
if (flags.LANGCHAIN_TRACING_V2) process.env.LANGCHAIN_TRACING_V2 = flags.LANGCHAIN_TRACING_V2
@@ -111,6 +114,9 @@ export default class Start extends Command {
if (flags.LANGCHAIN_API_KEY) process.env.LANGCHAIN_API_KEY = flags.LANGCHAIN_API_KEY
if (flags.LANGCHAIN_PROJECT) process.env.LANGCHAIN_PROJECT = flags.LANGCHAIN_PROJECT
// Telemetry
if (flags.DISABLE_FLOWISE_TELEMETRY) process.env.DISABLE_FLOWISE_TELEMETRY = flags.DISABLE_FLOWISE_TELEMETRY
await (async () => {
try {
logger.info('Starting Flowise...')
@@ -0,0 +1,24 @@
/* eslint-disable */
import { Entity, Column, CreateDateColumn, UpdateDateColumn, PrimaryGeneratedColumn } from 'typeorm'
import { IVariable } from '../../Interface'
@Entity()
export class Variable implements IVariable {
@PrimaryGeneratedColumn('uuid')
id: string
@Column()
name: string
@Column({ nullable: true, type: 'text' })
value: string
@Column({ default: 'string', type: 'text' })
type: string
@CreateDateColumn()
createdDate: Date
@UpdateDateColumn()
updatedDate: Date
}
@@ -3,11 +3,13 @@ import { ChatMessage } from './ChatMessage'
import { Credential } from './Credential'
import { Tool } from './Tool'
import { Assistant } from './Assistant'
import { Variable } from './Variable'
export const entities = {
ChatFlow,
ChatMessage,
Credential,
Tool,
Assistant
Assistant,
Variable
}
@@ -0,0 +1,21 @@
import { MigrationInterface, QueryRunner } from 'typeorm'
export class AddVariableEntity1699325775451 implements MigrationInterface {
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`CREATE TABLE IF NOT EXISTS \`variable\` (
\`id\` varchar(36) NOT NULL,
\`name\` varchar(255) NOT NULL,
\`value\` text NOT NULL,
\`type\` varchar(255) DEFAULT NULL,
\`createdDate\` datetime(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
\`updatedDate\` datetime(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6),
PRIMARY KEY (\`id\`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;`
)
}
public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`DROP TABLE variable`)
}
}
@@ -10,6 +10,7 @@ import { AddAssistantEntity1699325775451 } from './1699325775451-AddAssistantEnt
import { AddUsedToolsToChatMessage1699481607341 } from './1699481607341-AddUsedToolsToChatMessage'
import { AddCategoryToChatFlow1699900910291 } from './1699900910291-AddCategoryToChatFlow'
import { AddFileAnnotationsToChatMessage1700271021237 } from './1700271021237-AddFileAnnotationsToChatMessage'
import { AddVariableEntity1699325775451 } from './1702200925471-AddVariableEntity'
export const mysqlMigrations = [
Init1693840429259,
@@ -23,5 +24,6 @@ export const mysqlMigrations = [
AddAssistantEntity1699325775451,
AddUsedToolsToChatMessage1699481607341,
AddCategoryToChatFlow1699900910291,
AddFileAnnotationsToChatMessage1700271021237
AddFileAnnotationsToChatMessage1700271021237,
AddVariableEntity1699325775451
]
@@ -0,0 +1,21 @@
import { MigrationInterface, QueryRunner } from 'typeorm'
export class AddVariableEntity1699325775451 implements MigrationInterface {
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`CREATE TABLE IF NOT EXISTS variable (
id uuid NOT NULL DEFAULT uuid_generate_v4(),
"name" varchar NOT NULL,
"value" text NOT NULL,
"type" text NULL,
"createdDate" timestamp NOT NULL DEFAULT now(),
"updatedDate" timestamp NOT NULL DEFAULT now(),
CONSTRAINT "PK_98419043dd704f54-9830ab78f8" PRIMARY KEY (id)
);`
)
}
public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`DROP TABLE variable`)
}
}
@@ -10,6 +10,7 @@ import { AddAssistantEntity1699325775451 } from './1699325775451-AddAssistantEnt
import { AddUsedToolsToChatMessage1699481607341 } from './1699481607341-AddUsedToolsToChatMessage'
import { AddCategoryToChatFlow1699900910291 } from './1699900910291-AddCategoryToChatFlow'
import { AddFileAnnotationsToChatMessage1700271021237 } from './1700271021237-AddFileAnnotationsToChatMessage'
import { AddVariableEntity1699325775451 } from './1702200925471-AddVariableEntity'
export const postgresMigrations = [
Init1693891895163,
@@ -23,5 +24,6 @@ export const postgresMigrations = [
AddAssistantEntity1699325775451,
AddUsedToolsToChatMessage1699481607341,
AddCategoryToChatFlow1699900910291,
AddFileAnnotationsToChatMessage1700271021237
AddFileAnnotationsToChatMessage1700271021237,
AddVariableEntity1699325775451
]
@@ -0,0 +1,13 @@
import { MigrationInterface, QueryRunner } from 'typeorm'
export class AddVariableEntity1699325775451 implements MigrationInterface {
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`CREATE TABLE IF NOT EXISTS "variable" ("id" varchar PRIMARY KEY NOT NULL, "name" text NOT NULL, "value" text NOT NULL, "type" varchar, "createdDate" datetime NOT NULL DEFAULT (datetime('now')), "updatedDate" datetime NOT NULL DEFAULT (datetime('now')));`
)
}
public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`DROP TABLE variable`)
}
}
@@ -10,6 +10,7 @@ import { AddAssistantEntity1699325775451 } from './1699325775451-AddAssistantEnt
import { AddUsedToolsToChatMessage1699481607341 } from './1699481607341-AddUsedToolsToChatMessage'
import { AddCategoryToChatFlow1699900910291 } from './1699900910291-AddCategoryToChatFlow'
import { AddFileAnnotationsToChatMessage1700271021237 } from './1700271021237-AddFileAnnotationsToChatMessage'
import { AddVariableEntity1699325775451 } from './1702200925471-AddVariableEntity'
export const sqliteMigrations = [
Init1693835579790,
@@ -23,5 +24,6 @@ export const sqliteMigrations = [
AddAssistantEntity1699325775451,
AddUsedToolsToChatMessage1699481607341,
AddCategoryToChatFlow1699900910291,
AddFileAnnotationsToChatMessage1700271021237
AddFileAnnotationsToChatMessage1700271021237,
AddVariableEntity1699325775451
]
+422 -135
View File
@@ -20,13 +20,14 @@ import {
ICredentialReturnResponse,
chatType,
IChatMessage,
IReactFlowEdge
IDepthQueue,
INodeDirectedGraph
} from './Interface'
import {
getNodeModulesPackagePath,
getStartingNodes,
buildLangchain,
getEndingNode,
getEndingNodes,
constructGraphs,
resolveVariables,
isStartNodeDependOnInput,
@@ -39,10 +40,14 @@ import {
decryptCredentialData,
replaceInputsWithConfig,
getEncryptionKey,
replaceMemorySessionId,
getMemorySessionId,
getUserHome,
replaceChatHistory,
clearSessionMemory
getSessionChatHistory,
getAllConnectedNodes,
clearSessionMemory,
findMemoryNode,
getTelemetryFlowObj,
getAppVersion
} from './utils'
import { cloneDeep, omit, uniqWith, isEqual } from 'lodash'
import { getDataSource } from './DataSource'
@@ -54,15 +59,22 @@ import { Tool } from './database/entities/Tool'
import { Assistant } from './database/entities/Assistant'
import { ChatflowPool } from './ChatflowPool'
import { CachePool } from './CachePool'
import { ICommonObject, IMessage, INodeOptionsValue } from 'flowise-components'
import { ICommonObject, IMessage, INodeOptionsValue, handleEscapeCharacters } from 'flowise-components'
import { createRateLimiter, getRateLimiter, initializeRateLimiter } from './utils/rateLimit'
import { addAPIKey, compareKeys, deleteAPIKey, getApiKey, getAPIKeys, updateAPIKey } from './utils/apiKey'
import { sanitizeMiddleware } from './utils/XSS'
import axios from 'axios'
import { Client } from 'langchainhub'
import { parsePrompt } from './utils/hub'
import { Telemetry } from './utils/telemetry'
import { Variable } from './database/entities/Variable'
export class App {
app: express.Application
nodesPool: NodesPool
chatflowPool: ChatflowPool
cachePool: CachePool
telemetry: Telemetry
AppDataSource = getDataSource()
constructor() {
@@ -97,6 +109,9 @@ export class App {
// Initialize cache pool
this.cachePool = new CachePool()
// Initialize telemetry
this.telemetry = new Telemetry()
})
.catch((err) => {
logger.error('❌ [server]: Error during Data Source initialization:', err)
@@ -114,9 +129,15 @@ export class App {
// Allow access from *
this.app.use(cors())
// Switch off the default 'X-Powered-By: Express' header
this.app.disable('x-powered-by')
// Add the expressRequestLogger middleware to log all requests
this.app.use(expressRequestLogger)
// Add the sanitizeMiddleware to guard against XSS
this.app.use(sanitizeMiddleware)
if (process.env.FLOWISE_USERNAME && process.env.FLOWISE_PASSWORD) {
const username = process.env.FLOWISE_USERNAME
const password = process.env.FLOWISE_PASSWORD
@@ -127,6 +148,7 @@ export class App {
'/api/v1/verify/apikey/',
'/api/v1/chatflows/apikey/',
'/api/v1/public-chatflows',
'/api/v1/public-chatbotConfig',
'/api/v1/prediction/',
'/api/v1/vector/upsert/',
'/api/v1/node-icon/',
@@ -189,7 +211,7 @@ export class App {
// Get component credential via name
this.app.get('/api/v1/components-credentials/:name', (req: Request, res: Response) => {
if (!req.params.name.includes('&')) {
if (!req.params.name.includes('&amp;')) {
if (Object.prototype.hasOwnProperty.call(this.nodesPool.componentCredentials, req.params.name)) {
return res.json(this.nodesPool.componentCredentials[req.params.name])
} else {
@@ -197,7 +219,7 @@ export class App {
}
} else {
const returnResponse = []
for (const name of req.params.name.split('&')) {
for (const name of req.params.name.split('&amp;')) {
if (Object.prototype.hasOwnProperty.call(this.nodesPool.componentCredentials, name)) {
returnResponse.push(this.nodesPool.componentCredentials[name])
} else {
@@ -269,6 +291,35 @@ export class App {
}
})
// execute custom function node
this.app.post('/api/v1/node-custom-function', async (req: Request, res: Response) => {
const body = req.body
const nodeData = { inputs: body }
if (Object.prototype.hasOwnProperty.call(this.nodesPool.componentNodes, 'customFunction')) {
try {
const nodeInstanceFilePath = this.nodesPool.componentNodes['customFunction'].filePath as string
const nodeModule = await import(nodeInstanceFilePath)
const newNodeInstance = new nodeModule.nodeClass()
const options: ICommonObject = {
appDataSource: this.AppDataSource,
databaseEntities,
logger
}
const returnData = await newNodeInstance.init(nodeData, '', options)
const result = typeof returnData === 'string' ? handleEscapeCharacters(returnData, true) : returnData
return res.json(result)
} catch (error) {
return res.status(500).send(`Error running custom function: ${error}`)
}
} else {
res.status(404).send(`Node customFunction not found`)
return
}
})
// ----------------------------------------
// Chatflows
// ----------------------------------------
@@ -317,6 +368,24 @@ export class App {
return res.status(404).send(`Chatflow ${req.params.id} not found`)
})
// Get specific chatflow chatbotConfig via id (PUBLIC endpoint, used to retrieve config for embedded chat)
// Safe as public endpoint as chatbotConfig doesn't contain sensitive credential
this.app.get('/api/v1/public-chatbotConfig/:id', async (req: Request, res: Response) => {
const chatflow = await this.AppDataSource.getRepository(ChatFlow).findOneBy({
id: req.params.id
})
if (!chatflow) return res.status(404).send(`Chatflow ${req.params.id} not found`)
if (chatflow.chatbotConfig) {
try {
const parsedConfig = JSON.parse(chatflow.chatbotConfig)
return res.json(parsedConfig)
} catch (e) {
return res.status(500).send(`Error parsing Chatbot Config for Chatflow ${req.params.id}`)
}
}
return res.status(200).send('OK')
})
// Save chatflow
this.app.post('/api/v1/chatflows', async (req: Request, res: Response) => {
const body = req.body
@@ -326,6 +395,12 @@ export class App {
const chatflow = this.AppDataSource.getRepository(ChatFlow).create(newChatFlow)
const results = await this.AppDataSource.getRepository(ChatFlow).save(chatflow)
await this.telemetry.sendTelemetry('chatflow_created', {
version: await getAppVersion(),
chatlowId: results.id,
flowGraph: getTelemetryFlowObj(JSON.parse(results.flowData)?.nodes, JSON.parse(results.flowData)?.edges)
})
return res.json(results)
})
@@ -380,24 +455,29 @@ export class App {
const edges = parsedFlowData.edges
const { graph, nodeDependencies } = constructGraphs(nodes, edges)
const endingNodeId = getEndingNode(nodeDependencies, graph)
if (!endingNodeId) return res.status(500).send(`Ending node ${endingNodeId} not found`)
const endingNodeIds = getEndingNodes(nodeDependencies, graph)
if (!endingNodeIds.length) return res.status(500).send(`Ending nodes not found`)
const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data
if (!endingNodeData) return res.status(500).send(`Ending node ${endingNodeId} data not found`)
const endingNodes = nodes.filter((nd) => endingNodeIds.includes(nd.id))
if (
endingNodeData &&
endingNodeData.category !== 'Chains' &&
endingNodeData.category !== 'Agents' &&
endingNodeData.category !== 'Engine'
) {
return res.status(500).send(`Ending node must be either a Chain or Agent`)
let isStreaming = false
for (const endingNode of endingNodes) {
const endingNodeData = endingNode.data
if (!endingNodeData) return res.status(500).send(`Ending node ${endingNode.id} data not found`)
if (
endingNodeData &&
endingNodeData.category !== 'Chains' &&
endingNodeData.category !== 'Agents' &&
endingNodeData.category !== 'Engine'
) {
return res.status(500).send(`Ending node must be either a Chain or Agent`)
}
isStreaming = isFlowValidForStream(nodes, endingNodeData)
}
const obj = {
isStreaming: isFlowValidForStream(nodes, endingNodeData)
}
const obj = { isStreaming }
return res.json(obj)
})
@@ -466,7 +546,7 @@ export class App {
res.status(404).send(`Chatflow ${chatflowid} not found`)
return
}
const chatId = (req.query?.chatId as string) ?? (await getChatId(chatflowid))
const chatId = req.query?.chatId as string
const memoryType = req.query?.memoryType as string | undefined
const sessionId = req.query?.sessionId as string | undefined
const chatType = req.query?.chatType as string | undefined
@@ -476,17 +556,22 @@ export class App {
const parsedFlowData: IReactFlowObject = JSON.parse(flowData)
const nodes = parsedFlowData.nodes
await clearSessionMemory(
nodes,
this.nodesPool.componentNodes,
chatId,
this.AppDataSource,
sessionId,
memoryType,
isClearFromViewMessageDialog
)
try {
await clearSessionMemory(
nodes,
this.nodesPool.componentNodes,
chatId,
this.AppDataSource,
sessionId,
memoryType,
isClearFromViewMessageDialog
)
} catch (e) {
return res.status(500).send('Error clearing chat messages')
}
const deleteOptions: FindOptionsWhere<ChatMessage> = { chatflowid, chatId }
const deleteOptions: FindOptionsWhere<ChatMessage> = { chatflowid }
if (chatId) deleteOptions.chatId = chatId
if (memoryType) deleteOptions.memoryType = memoryType
if (sessionId) deleteOptions.sessionId = sessionId
if (chatType) deleteOptions.chatType = chatType
@@ -574,7 +659,7 @@ export class App {
return res.json(result)
})
// Delete all chatmessages from chatflowid
// Delete all credentials from chatflowid
this.app.delete('/api/v1/credentials/:id', async (req: Request, res: Response) => {
const results = await this.AppDataSource.getRepository(Credential).delete({ id: req.params.id })
return res.json(results)
@@ -607,6 +692,12 @@ export class App {
const tool = this.AppDataSource.getRepository(Tool).create(newTool)
const results = await this.AppDataSource.getRepository(Tool).save(tool)
await this.telemetry.sendTelemetry('tool_created', {
version: await getAppVersion(),
toolId: results.id,
toolName: results.name
})
return res.json(results)
})
@@ -813,6 +904,11 @@ export class App {
const assistant = this.AppDataSource.getRepository(Assistant).create(newAssistant)
const results = await this.AppDataSource.getRepository(Assistant).save(assistant)
await this.telemetry.sendTelemetry('assistant_created', {
version: await getAppVersion(),
assistantId: results.id
})
return res.json(results)
})
@@ -965,6 +1061,12 @@ export class App {
// Download file from assistant
this.app.post('/api/v1/openai-assistants-file', async (req: Request, res: Response) => {
const filePath = path.join(getUserHome(), '.flowise', 'openai-assistant', req.body.fileName)
//raise error if file path is not absolute
if (!path.isAbsolute(filePath)) return res.status(500).send(`Invalid file path`)
//raise error if file path contains '..'
if (filePath.includes('..')) return res.status(500).send(`Invalid file path`)
//only return from the .flowise openai-assistant folder
if (!(filePath.includes('.flowise') && filePath.includes('openai-assistant'))) return res.status(500).send(`Invalid file path`)
res.setHeader('Content-Disposition', 'attachment; filename=' + path.basename(filePath))
const fileStream = fs.createReadStream(filePath)
fileStream.pipe(res)
@@ -1029,12 +1131,41 @@ export class App {
upload.array('files'),
(req: Request, res: Response, next: NextFunction) => getRateLimiter(req, res, next),
async (req: Request, res: Response) => {
await this.buildChatflow(req, res, undefined, false, true)
await this.upsertVector(req, res)
}
)
this.app.post('/api/v1/vector/internal-upsert/:id', async (req: Request, res: Response) => {
await this.buildChatflow(req, res, undefined, true, true)
await this.upsertVector(req, res, true)
})
// ----------------------------------------
// Prompt from Hub
// ----------------------------------------
this.app.post('/api/v1/load-prompt', async (req: Request, res: Response) => {
try {
let hub = new Client()
const prompt = await hub.pull(req.body.promptName)
const templates = parsePrompt(prompt)
return res.json({ status: 'OK', prompt: req.body.promptName, templates: templates })
} catch (e: any) {
return res.json({ status: 'ERROR', prompt: req.body.promptName, error: e?.message })
}
})
this.app.post('/api/v1/prompts-list', async (req: Request, res: Response) => {
try {
const tags = req.body.tags ? `tags=${req.body.tags}` : ''
// Default to 100, TODO: add pagination and use offset & limit
const url = `https://api.hub.langchain.com/repos/?limit=100&${tags}has_commits=true&sort_field=num_likes&sort_direction=desc&is_archived=false`
axios.get(url).then((response) => {
if (response.data.repos) {
return res.json({ status: 'OK', repos: response.data.repos })
}
})
} catch (e: any) {
return res.json({ status: 'ERROR', repos: [] })
}
})
// ----------------------------------------
@@ -1106,6 +1237,47 @@ export class App {
return res.json(templates)
})
// ----------------------------------------
// Variables
// ----------------------------------------
this.app.get('/api/v1/variables', async (req: Request, res: Response) => {
const variables = await getDataSource().getRepository(Variable).find()
return res.json(variables)
})
// Create new variable
this.app.post('/api/v1/variables', async (req: Request, res: Response) => {
const body = req.body
const newVariable = new Variable()
Object.assign(newVariable, body)
const variable = this.AppDataSource.getRepository(Variable).create(newVariable)
const results = await this.AppDataSource.getRepository(Variable).save(variable)
return res.json(results)
})
// Update variable
this.app.put('/api/v1/variables/:id', async (req: Request, res: Response) => {
const variable = await this.AppDataSource.getRepository(Variable).findOneBy({
id: req.params.id
})
if (!variable) return res.status(404).send(`Variable ${req.params.id} not found`)
const body = req.body
const updateVariable = new Variable()
Object.assign(updateVariable, body)
this.AppDataSource.getRepository(Variable).merge(variable, updateVariable)
const result = await this.AppDataSource.getRepository(Variable).save(variable)
return res.json(result)
})
// Delete variable via id
this.app.delete('/api/v1/variables/:id', async (req: Request, res: Response) => {
const results = await this.AppDataSource.getRepository(Variable).delete({ id: req.params.id })
return res.json(results)
})
// ----------------------------------------
// API Keys
// ----------------------------------------
@@ -1262,24 +1434,123 @@ export class App {
return await this.AppDataSource.getRepository(ChatMessage).save(chatmessage)
}
/**
* Method that find memory label that is connected within chatflow
* In a chatflow, there should only be 1 memory node
* @param {IReactFlowNode[]} nodes
* @param {IReactFlowEdge[]} edges
* @returns {string | undefined}
*/
findMemoryLabel(nodes: IReactFlowNode[], edges: IReactFlowEdge[]): IReactFlowNode | undefined {
const memoryNodes = nodes.filter((node) => node.data.category === 'Memory')
const memoryNodeIds = memoryNodes.map((mem) => mem.data.id)
async upsertVector(req: Request, res: Response, isInternal: boolean = false) {
try {
const chatflowid = req.params.id
let incomingInput: IncomingInput = req.body
for (const edge of edges) {
if (memoryNodeIds.includes(edge.source)) {
const memoryNode = nodes.find((node) => node.data.id === edge.source)
return memoryNode
const chatflow = await this.AppDataSource.getRepository(ChatFlow).findOneBy({
id: chatflowid
})
if (!chatflow) return res.status(404).send(`Chatflow ${chatflowid} not found`)
if (!isInternal) {
const isKeyValidated = await this.validateKey(req, chatflow)
if (!isKeyValidated) return res.status(401).send('Unauthorized')
}
const files = (req.files as any[]) || []
if (files.length) {
const overrideConfig: ICommonObject = { ...req.body }
for (const file of files) {
const fileData = fs.readFileSync(file.path, { encoding: 'base64' })
const dataBase64String = `data:${file.mimetype};base64,${fileData},filename:${file.filename}`
const fileInputField = mapMimeTypeToInputField(file.mimetype)
if (overrideConfig[fileInputField]) {
overrideConfig[fileInputField] = JSON.stringify([...JSON.parse(overrideConfig[fileInputField]), dataBase64String])
} else {
overrideConfig[fileInputField] = JSON.stringify([dataBase64String])
}
}
incomingInput = {
question: req.body.question ?? 'hello',
overrideConfig,
history: [],
stopNodeId: req.body.stopNodeId
}
}
/*** Get chatflows and prepare data ***/
const flowData = chatflow.flowData
const parsedFlowData: IReactFlowObject = JSON.parse(flowData)
const nodes = parsedFlowData.nodes
const edges = parsedFlowData.edges
let stopNodeId = incomingInput?.stopNodeId ?? ''
let chatHistory = incomingInput?.history
let chatId = incomingInput.chatId ?? ''
let isUpsert = true
// Get session ID
const memoryNode = findMemoryNode(nodes, edges)
let sessionId = undefined
if (memoryNode) sessionId = getMemorySessionId(memoryNode, incomingInput, chatId, isInternal)
const vsNodes = nodes.filter(
(node) =>
node.data.category === 'Vector Stores' &&
!node.data.label.includes('Upsert') &&
!node.data.label.includes('Load Existing')
)
if (vsNodes.length > 1 && !stopNodeId) {
return res.status(500).send('There are multiple vector nodes, please provide stopNodeId in body request')
} else if (vsNodes.length === 1 && !stopNodeId) {
stopNodeId = vsNodes[0].data.id
} else if (!vsNodes.length && !stopNodeId) {
return res.status(500).send('No vector node found')
}
const { graph } = constructGraphs(nodes, edges, { isReversed: true })
const nodeIds = getAllConnectedNodes(graph, stopNodeId)
const filteredGraph: INodeDirectedGraph = {}
for (const key of nodeIds) {
if (Object.prototype.hasOwnProperty.call(graph, key)) {
filteredGraph[key] = graph[key]
}
}
const { startingNodeIds, depthQueue } = getStartingNodes(filteredGraph, stopNodeId)
await buildLangchain(
startingNodeIds,
nodes,
edges,
filteredGraph,
depthQueue,
this.nodesPool.componentNodes,
incomingInput.question,
chatHistory,
chatId,
sessionId ?? '',
chatflowid,
this.AppDataSource,
incomingInput?.overrideConfig,
this.cachePool,
isUpsert,
stopNodeId
)
const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.data.id))
this.chatflowPool.add(chatflowid, undefined, startingNodes, incomingInput?.overrideConfig)
await this.telemetry.sendTelemetry('vector_upserted', {
version: await getAppVersion(),
chatlowId: chatflowid,
type: isInternal ? chatType.INTERNAL : chatType.EXTERNAL,
flowGraph: getTelemetryFlowObj(nodes, edges),
stopNodeId
})
return res.status(201).send('Successfully Upserted')
} catch (e: any) {
logger.error('[server]: Error:', e)
return res.status(500).send(e.message)
}
return undefined
}
/**
@@ -1290,7 +1561,7 @@ export class App {
* @param {boolean} isInternal
* @param {boolean} isUpsert
*/
async buildChatflow(req: Request, res: Response, socketIO?: Server, isInternal: boolean = false, isUpsert: boolean = false) {
async buildChatflow(req: Request, res: Response, socketIO?: Server, isInternal: boolean = false) {
try {
const chatflowid = req.params.id
let incomingInput: IncomingInput = req.body
@@ -1331,8 +1602,7 @@ export class App {
question: req.body.question ?? 'hello',
overrideConfig,
history: [],
socketIOClientId: req.body.socketIOClientId,
stopNodeId: req.body.stopNodeId
socketIOClientId: req.body.socketIOClientId
}
}
@@ -1342,28 +1612,35 @@ export class App {
const nodes = parsedFlowData.nodes
const edges = parsedFlowData.edges
/* Reuse the flow without having to rebuild (to avoid duplicated upsert, recomputation) when all these conditions met:
// Get session ID
const memoryNode = findMemoryNode(nodes, edges)
const memoryType = memoryNode?.data.label
let sessionId = undefined
if (memoryNode) sessionId = getMemorySessionId(memoryNode, incomingInput, chatId, isInternal)
/* Reuse the flow without having to rebuild (to avoid duplicated upsert, recomputation, reinitialization of memory) when all these conditions met:
* - Node Data already exists in pool
* - Still in sync (i.e the flow has not been modified since)
* - Existing overrideConfig and new overrideConfig are the same
* - Flow doesn't start with/contain nodes that depend on incomingInput.question
* TODO: convert overrideConfig to hash when we no longer store base64 string but filepath
***/
const isFlowReusable = () => {
return (
Object.prototype.hasOwnProperty.call(this.chatflowPool.activeChatflows, chatflowid) &&
this.chatflowPool.activeChatflows[chatflowid].inSync &&
this.chatflowPool.activeChatflows[chatflowid].endingNodeData &&
isSameOverrideConfig(
isInternal,
this.chatflowPool.activeChatflows[chatflowid].overrideConfig,
incomingInput.overrideConfig
) &&
!isStartNodeDependOnInput(this.chatflowPool.activeChatflows[chatflowid].startingNodes, nodes) &&
!isUpsert
!isStartNodeDependOnInput(this.chatflowPool.activeChatflows[chatflowid].startingNodes, nodes)
)
}
if (isFlowReusable()) {
nodeToExecuteData = this.chatflowPool.activeChatflows[chatflowid].endingNodeData
nodeToExecuteData = this.chatflowPool.activeChatflows[chatflowid].endingNodeData as INodeData
isStreamValid = isFlowValidForStream(nodes, nodeToExecuteData)
logger.debug(
`[server]: Reuse existing chatflow ${chatflowid} with ending node ${nodeToExecuteData.label} (${nodeToExecuteData.id})`
@@ -1372,79 +1649,101 @@ export class App {
/*** Get Ending Node with Directed Graph ***/
const { graph, nodeDependencies } = constructGraphs(nodes, edges)
const directedGraph = graph
const endingNodeId = getEndingNode(nodeDependencies, directedGraph)
if (!endingNodeId) return res.status(500).send(`Ending node ${endingNodeId} not found`)
const endingNodeIds = getEndingNodes(nodeDependencies, directedGraph)
if (!endingNodeIds.length) return res.status(500).send(`Ending nodes not found`)
const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data
if (!endingNodeData) return res.status(500).send(`Ending node ${endingNodeId} data not found`)
const endingNodes = nodes.filter((nd) => endingNodeIds.includes(nd.id))
for (const endingNode of endingNodes) {
const endingNodeData = endingNode.data
if (!endingNodeData) return res.status(500).send(`Ending node ${endingNode.id} data not found`)
if (
endingNodeData &&
endingNodeData.category !== 'Chains' &&
endingNodeData.category !== 'Agents' &&
endingNodeData.category !== 'Engine' &&
!isUpsert
) {
return res.status(500).send(`Ending node must be either a Chain or Agent`)
if (
endingNodeData &&
endingNodeData.category !== 'Chains' &&
endingNodeData.category !== 'Agents' &&
endingNodeData.category !== 'Engine'
) {
return res.status(500).send(`Ending node must be either a Chain or Agent`)
}
if (
endingNodeData.outputs &&
Object.keys(endingNodeData.outputs).length &&
!Object.values(endingNodeData.outputs).includes(endingNodeData.name)
) {
return res
.status(500)
.send(
`Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction`
)
}
isStreamValid = isFlowValidForStream(nodes, endingNodeData)
}
if (
endingNodeData.outputs &&
Object.keys(endingNodeData.outputs).length &&
!Object.values(endingNodeData.outputs).includes(endingNodeData.name) &&
!isUpsert
) {
return res
.status(500)
.send(
`Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction`
)
}
let chatHistory: IMessage[] = incomingInput.history ?? []
isStreamValid = isFlowValidForStream(nodes, endingNodeData)
// When {{chat_history}} is used in Prompt Template, fetch the chat conversations from memory node
for (const endingNode of endingNodes) {
const endingNodeData = endingNode.data
let chatHistory: IMessage[] = incomingInput.history
if (!endingNodeData.inputs?.memory) continue
// If chatHistory is empty, and sessionId/chatId is present, replace it
if (
endingNodeData.inputs?.memory &&
!incomingInput.history &&
(incomingInput.chatId || incomingInput.overrideConfig?.sessionId)
) {
const memoryNodeId = endingNodeData.inputs?.memory.split('.')[0].replace('{{', '')
const memoryNode = nodes.find((node) => node.data.id === memoryNodeId)
if (memoryNode) {
chatHistory = await replaceChatHistory(memoryNode, incomingInput, this.AppDataSource, databaseEntities, logger)
if (!memoryNode) continue
if (!chatHistory.length && (incomingInput.chatId || incomingInput.overrideConfig?.sessionId)) {
chatHistory = await getSessionChatHistory(
memoryNode,
this.nodesPool.componentNodes,
incomingInput,
this.AppDataSource,
databaseEntities,
logger
)
}
}
/*** Get Starting Nodes with Non-Directed Graph ***/
const constructedObj = constructGraphs(nodes, edges, true)
/*** Get Starting Nodes with Reversed Graph ***/
const constructedObj = constructGraphs(nodes, edges, { isReversed: true })
const nonDirectedGraph = constructedObj.graph
const { startingNodeIds, depthQueue } = getStartingNodes(nonDirectedGraph, endingNodeId)
let startingNodeIds: string[] = []
let depthQueue: IDepthQueue = {}
for (const endingNodeId of endingNodeIds) {
const res = getStartingNodes(nonDirectedGraph, endingNodeId)
startingNodeIds.push(...res.startingNodeIds)
depthQueue = Object.assign(depthQueue, res.depthQueue)
}
startingNodeIds = [...new Set(startingNodeIds)]
const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.id))
logger.debug(`[server]: Start building chatflow ${chatflowid}`)
/*** BFS to traverse from Starting Nodes to Ending Node ***/
const reactFlowNodes = await buildLangchain(
startingNodeIds,
nodes,
edges,
graph,
depthQueue,
this.nodesPool.componentNodes,
incomingInput.question,
chatHistory,
chatId,
sessionId ?? '',
chatflowid,
this.AppDataSource,
incomingInput?.overrideConfig,
this.cachePool,
isUpsert,
incomingInput.stopNodeId
this.cachePool
)
if (isUpsert) return res.status(201).send('Successfully Upserted')
const nodeToExecute = reactFlowNodes.find((node: IReactFlowNode) => node.id === endingNodeId)
if (!nodeToExecute) return res.status(404).send(`Node ${endingNodeId} not found`)
const nodeToExecute =
endingNodeIds.length === 1
? reactFlowNodes.find((node: IReactFlowNode) => endingNodeIds[0] === node.id)
: reactFlowNodes[reactFlowNodes.length - 1]
if (!nodeToExecute) return res.status(404).send(`Node not found`)
if (incomingInput.overrideConfig) {
nodeToExecute.data = replaceInputsWithConfig(nodeToExecute.data, incomingInput.overrideConfig)
@@ -1458,37 +1757,35 @@ export class App {
)
nodeToExecuteData = reactFlowNodeData
const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.id))
this.chatflowPool.add(chatflowid, nodeToExecuteData, startingNodes, incomingInput?.overrideConfig)
}
const nodeInstanceFilePath = this.nodesPool.componentNodes[nodeToExecuteData.name].filePath as string
const nodeModule = await import(nodeInstanceFilePath)
const nodeInstance = new nodeModule.nodeClass()
logger.debug(`[server]: Running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`)
let sessionId = undefined
if (nodeToExecuteData.instance) sessionId = replaceMemorySessionId(nodeToExecuteData.instance, chatId)
const nodeInstanceFilePath = this.nodesPool.componentNodes[nodeToExecuteData.name].filePath as string
const nodeModule = await import(nodeInstanceFilePath)
const nodeInstance = new nodeModule.nodeClass({ sessionId })
let result = isStreamValid
? await nodeInstance.run(nodeToExecuteData, incomingInput.question, {
chatId,
chatflowid,
chatHistory: incomingInput.history,
socketIO,
socketIOClientId: incomingInput.socketIOClientId,
logger,
appDataSource: this.AppDataSource,
databaseEntities,
analytic: chatflow.analytic,
chatId
socketIO,
socketIOClientId: incomingInput.socketIOClientId
})
: await nodeInstance.run(nodeToExecuteData, incomingInput.question, {
chatId,
chatflowid,
chatHistory: incomingInput.history,
logger,
appDataSource: this.AppDataSource,
databaseEntities,
analytic: chatflow.analytic,
chatId
analytic: chatflow.analytic
})
result = typeof result === 'string' ? { text: result } : result
@@ -1498,9 +1795,6 @@ export class App {
sessionId = result.assistant.threadId
}
const memoryNode = this.findMemoryLabel(nodes, edges)
const memoryType = memoryNode?.data.label
const userMessage: Omit<IChatMessage, 'id'> = {
role: 'userMessage',
content: incomingInput.question,
@@ -1533,9 +1827,18 @@ export class App {
await this.addChatMessage(apiMessage)
logger.debug(`[server]: Finished running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`)
await this.telemetry.sendTelemetry('prediction_sent', {
version: await getAppVersion(),
chatlowId: chatflowid,
chatId,
type: isInternal ? chatType.INTERNAL : chatType.EXTERNAL,
flowGraph: getTelemetryFlowObj(nodes, edges)
})
// Only return ChatId when its Internal OR incoming input has ChatId, to avoid confusion when calling API
if (incomingInput.chatId || isInternal) result.chatId = chatId
// Prepare response
result.chatId = chatId
if (sessionId) result.sessionId = sessionId
if (memoryType) result.memoryType = memoryType
return res.json(result)
} catch (e: any) {
@@ -1547,6 +1850,7 @@ export class App {
async stopApp() {
try {
const removePromises: any[] = []
removePromises.push(this.telemetry.flush())
await Promise.all(removePromises)
} catch (e) {
logger.error(`❌[server]: Flowise Server shut down error: ${e}`)
@@ -1554,23 +1858,6 @@ export class App {
}
}
/**
* Get first chat message id
* @param {string} chatflowid
* @returns {string}
*/
export async function getChatId(chatflowid: string): Promise<string> {
// first chatmessage id as the unique chat id
const firstChatMessage = await getDataSource()
.getRepository(ChatMessage)
.createQueryBuilder('cm')
.select('cm.id')
.where('chatflowid = :chatflowid', { chatflowid })
.orderBy('cm.createdDate', 'ASC')
.getOne()
return firstChatMessage ? firstChatMessage.id : ''
}
let serverApp: App | undefined
export async function getAllChatFlow(): Promise<IChatFlow[]> {
+20
View File
@@ -0,0 +1,20 @@
import { Request, Response, NextFunction } from 'express'
import sanitizeHtml from 'sanitize-html'
export function sanitizeMiddleware(req: Request, res: Response, next: NextFunction): void {
// decoding is necessary as the url is encoded by the browser
const decodedURI = decodeURI(req.url)
req.url = sanitizeHtml(decodedURI)
for (let p in req.query) {
if (Array.isArray(req.query[p])) {
const sanitizedQ = []
for (const q of req.query[p] as string[]) {
sanitizedQ.push(sanitizeHtml(q))
}
req.query[p] = sanitizedQ
} else {
req.query[p] = sanitizeHtml(req.query[p] as string)
}
}
next()
}
+36
View File
@@ -0,0 +1,36 @@
export function parsePrompt(prompt: string): any[] {
const promptObj = JSON.parse(prompt)
let response = []
if (promptObj.kwargs.messages) {
promptObj.kwargs.messages.forEach((message: any) => {
let messageType = message.id.includes('SystemMessagePromptTemplate')
? 'systemMessagePrompt'
: message.id.includes('HumanMessagePromptTemplate')
? 'humanMessagePrompt'
: message.id.includes('AIMessagePromptTemplate')
? 'aiMessagePrompt'
: 'template'
let messageTypeDisplay = message.id.includes('SystemMessagePromptTemplate')
? 'System Message'
: message.id.includes('HumanMessagePromptTemplate')
? 'Human Message'
: message.id.includes('AIMessagePromptTemplate')
? 'AI Message'
: 'Message'
let template = message.kwargs.prompt.kwargs.template
response.push({
type: messageType,
typeDisplay: messageTypeDisplay,
template: template
})
})
} else if (promptObj.kwargs.template) {
let template = promptObj.kwargs.template
response.push({
type: 'template',
typeDisplay: 'Prompt',
template: template
})
}
return response
}
+314 -58
View File
@@ -23,9 +23,11 @@ import {
convertChatHistoryToText,
getInputVariables,
handleEscapeCharacters,
getEncryptionKeyPath,
ICommonObject,
IDatabaseEntity,
IMessage
IMessage,
FlowiseMemory
} from 'flowise-components'
import { randomBytes } from 'crypto'
import { AES, enc } from 'crypto-js'
@@ -37,6 +39,7 @@ import { Tool } from '../database/entities/Tool'
import { Assistant } from '../database/entities/Assistant'
import { DataSource } from 'typeorm'
import { CachePool } from '../CachePool'
import { Variable } from '../database/entities/Variable'
const QUESTION_VAR_PREFIX = 'question'
const CHAT_HISTORY_VAR_PREFIX = 'chat_history'
@@ -47,7 +50,8 @@ export const databaseEntities: IDatabaseEntity = {
ChatMessage: ChatMessage,
Tool: Tool,
Credential: Credential,
Assistant: Assistant
Assistant: Assistant,
Variable: Variable
}
/**
@@ -95,9 +99,13 @@ export const getNodeModulesPackagePath = (packageName: string): string => {
* Construct graph and node dependencies score
* @param {IReactFlowNode[]} reactFlowNodes
* @param {IReactFlowEdge[]} reactFlowEdges
* @param {boolean} isNondirected
* @param {{ isNonDirected?: boolean, isReversed?: boolean }} options
*/
export const constructGraphs = (reactFlowNodes: IReactFlowNode[], reactFlowEdges: IReactFlowEdge[], isNondirected = false) => {
export const constructGraphs = (
reactFlowNodes: IReactFlowNode[],
reactFlowEdges: IReactFlowEdge[],
options?: { isNonDirected?: boolean; isReversed?: boolean }
) => {
const nodeDependencies = {} as INodeDependencies
const graph = {} as INodeDirectedGraph
@@ -107,6 +115,23 @@ export const constructGraphs = (reactFlowNodes: IReactFlowNode[], reactFlowEdges
graph[nodeId] = []
}
if (options && options.isReversed) {
for (let i = 0; i < reactFlowEdges.length; i += 1) {
const source = reactFlowEdges[i].source
const target = reactFlowEdges[i].target
if (Object.prototype.hasOwnProperty.call(graph, target)) {
graph[target].push(source)
} else {
graph[target] = [source]
}
nodeDependencies[target] += 1
}
return { graph, nodeDependencies }
}
for (let i = 0; i < reactFlowEdges.length; i += 1) {
const source = reactFlowEdges[i].source
const target = reactFlowEdges[i].target
@@ -117,7 +142,7 @@ export const constructGraphs = (reactFlowNodes: IReactFlowNode[], reactFlowEdges
graph[source] = [target]
}
if (isNondirected) {
if (options && options.isNonDirected) {
if (Object.prototype.hasOwnProperty.call(graph, target)) {
graph[target].push(source)
} else {
@@ -179,21 +204,49 @@ export const getStartingNodes = (graph: INodeDirectedGraph, endNodeId: string) =
return { startingNodeIds, depthQueue: depthQueueReversed }
}
/**
* Get all connected nodes from startnode
* @param {INodeDependencies} graph
* @param {string} startNodeId
*/
export const getAllConnectedNodes = (graph: INodeDirectedGraph, startNodeId: string) => {
const visited = new Set<string>()
const queue: Array<[string]> = [[startNodeId]]
while (queue.length > 0) {
const [currentNode] = queue.shift()!
if (visited.has(currentNode)) {
continue
}
visited.add(currentNode)
for (const neighbor of graph[currentNode]) {
if (!visited.has(neighbor)) {
queue.push([neighbor])
}
}
}
return [...visited]
}
/**
* Get ending node and check if flow is valid
* @param {INodeDependencies} nodeDependencies
* @param {INodeDirectedGraph} graph
*/
export const getEndingNode = (nodeDependencies: INodeDependencies, graph: INodeDirectedGraph) => {
let endingNodeId = ''
export const getEndingNodes = (nodeDependencies: INodeDependencies, graph: INodeDirectedGraph) => {
const endingNodeIds: string[] = []
Object.keys(graph).forEach((nodeId) => {
if (Object.keys(nodeDependencies).length === 1) {
endingNodeId = nodeId
endingNodeIds.push(nodeId)
} else if (!graph[nodeId].length && nodeDependencies[nodeId] > 0) {
endingNodeId = nodeId
endingNodeIds.push(nodeId)
}
})
return endingNodeId
return endingNodeIds
}
/**
@@ -213,12 +266,14 @@ export const getEndingNode = (nodeDependencies: INodeDependencies, graph: INodeD
export const buildLangchain = async (
startingNodeIds: string[],
reactFlowNodes: IReactFlowNode[],
reactFlowEdges: IReactFlowEdge[],
graph: INodeDirectedGraph,
depthQueue: IDepthQueue,
componentNodes: IComponentNodes,
question: string,
chatHistory: IMessage[],
chatId: string,
sessionId: string,
chatflowid: string,
appDataSource: DataSource,
overrideConfig?: ICommonObject,
@@ -231,6 +286,8 @@ export const buildLangchain = async (
// Create a Queue and add our initial node in it
const nodeQueue = [] as INodeQueue[]
const exploredNode = {} as IExploredNode
const dynamicVariables = {} as Record<string, unknown>
let ignoreNodeIds: string[] = []
// In the case of infinite loop, only max 3 loops will be executed
const maxLoop = 3
@@ -256,31 +313,72 @@ export const buildLangchain = async (
if (overrideConfig) flowNodeData = replaceInputsWithConfig(flowNodeData, overrideConfig)
const reactFlowNodeData: INodeData = resolveVariables(flowNodeData, flowNodes, question, chatHistory)
if (
isUpsert &&
((stopNodeId && reactFlowNodeData.id === stopNodeId) || (!stopNodeId && reactFlowNodeData.category === 'Vector Stores'))
) {
// TODO: Avoid processing Text Splitter + Doc Loader once Upsert & Load Existing Vector Nodes are deprecated
if (isUpsert && stopNodeId && nodeId === stopNodeId) {
logger.debug(`[server]: Upserting ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
await newNodeInstance.vectorStoreMethods!['upsert']!.call(newNodeInstance, reactFlowNodeData, {
chatId,
sessionId,
chatflowid,
chatHistory,
logger,
appDataSource,
databaseEntities,
logger,
cachePool
cachePool,
dynamicVariables
})
logger.debug(`[server]: Finished upserting ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
break
} else {
logger.debug(`[server]: Initializing ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
flowNodes[nodeIndex].data.instance = await newNodeInstance.init(reactFlowNodeData, question, {
let outputResult = await newNodeInstance.init(reactFlowNodeData, question, {
chatId,
sessionId,
chatflowid,
chatHistory,
logger,
appDataSource,
databaseEntities,
logger,
cachePool
cachePool,
dynamicVariables
})
// Save dynamic variables
if (reactFlowNode.data.name === 'setVariable') {
const dynamicVars = outputResult?.dynamicVariables ?? {}
for (const variableKey in dynamicVars) {
dynamicVariables[variableKey] = dynamicVars[variableKey]
}
outputResult = outputResult?.output
}
// Determine which nodes to route next when it comes to ifElse
if (reactFlowNode.data.name === 'ifElseFunction' && typeof outputResult === 'object') {
let sourceHandle = ''
if (outputResult.type === true) {
sourceHandle = `${nodeId}-output-returnFalse-string|number|boolean|json|array`
} else if (outputResult.type === false) {
sourceHandle = `${nodeId}-output-returnTrue-string|number|boolean|json|array`
}
const ifElseEdge = reactFlowEdges.find((edg) => edg.source === nodeId && edg.sourceHandle === sourceHandle)
if (ifElseEdge) {
const { graph } = constructGraphs(
reactFlowNodes,
reactFlowEdges.filter((edg) => !(edg.source === nodeId && edg.sourceHandle === sourceHandle)),
{ isNonDirected: true }
)
ignoreNodeIds.push(ifElseEdge.target, ...getAllConnectedNodes(graph, ifElseEdge.target))
ignoreNodeIds = [...new Set(ignoreNodeIds)]
}
outputResult = outputResult?.output
}
flowNodes[nodeIndex].data.instance = outputResult
logger.debug(`[server]: Finished initializing ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
}
} catch (e: any) {
@@ -288,7 +386,7 @@ export const buildLangchain = async (
throw new Error(e)
}
const neighbourNodeIds = graph[nodeId]
let neighbourNodeIds = graph[nodeId]
const nextDepth = depth + 1
// Find other nodes that are on the same depth level
@@ -299,9 +397,11 @@ export const buildLangchain = async (
neighbourNodeIds.push(id)
}
neighbourNodeIds = neighbourNodeIds.filter((neigh) => !ignoreNodeIds.includes(neigh))
for (let i = 0; i < neighbourNodeIds.length; i += 1) {
const neighNodeId = neighbourNodeIds[i]
if (ignoreNodeIds.includes(neighNodeId)) continue
// If nodeId has been seen, cycle detected
if (Object.prototype.hasOwnProperty.call(exploredNode, neighNodeId)) {
const { remainingLoop, lastSeenDepth } = exploredNode[neighNodeId]
@@ -319,6 +419,12 @@ export const buildLangchain = async (
nodeQueue.push({ nodeId: neighNodeId, depth: nextDepth })
}
}
// Move end node to last
if (!neighbourNodeIds.length) {
const index = flowNodes.findIndex((nd) => nd.data.id === nodeId)
flowNodes.push(flowNodes.splice(index, 1)[0])
}
}
return flowNodes
}
@@ -351,15 +457,25 @@ export const clearSessionMemory = async (
const nodeInstanceFilePath = componentNodes[node.data.name].filePath as string
const nodeModule = await import(nodeInstanceFilePath)
const newNodeInstance = new nodeModule.nodeClass()
const options: ICommonObject = { chatId, appDataSource, databaseEntities, logger }
// SessionId always take priority first because it is the sessionId used for 3rd party memory node
if (sessionId && node.data.inputs) {
node.data.inputs.sessionId = sessionId
}
const initializedInstance = await newNodeInstance.init(node.data, '', { chatId, appDataSource, databaseEntities, logger })
if (initializedInstance.clearChatMessages) {
await initializedInstance.clearChatMessages()
if (node.data.type === 'OpenAIAssistant') {
await newNodeInstance.clearChatMessages(node.data, options, { type: 'threadId', id: sessionId })
} else {
node.data.inputs.sessionId = sessionId
const initializedInstance: FlowiseMemory = await newNodeInstance.init(node.data, '', options)
await initializedInstance.clearChatMessages(sessionId)
}
} else if (chatId && node.data.inputs) {
if (node.data.type === 'OpenAIAssistant') {
await newNodeInstance.clearChatMessages(node.data, options, { type: 'chatId', id: chatId })
} else {
node.data.inputs.sessionId = chatId
const initializedInstance: FlowiseMemory = await newNodeInstance.init(node.data, '', options)
await initializedInstance.clearChatMessages(chatId)
}
}
}
}
@@ -434,7 +550,11 @@ export const getVariableValue = (
variablePaths.forEach((path) => {
const variableValue = variableDict[path]
// Replace all occurrence
returnVal = returnVal.split(path).join(variableValue)
if (typeof variableValue === 'object') {
returnVal = returnVal.split(path).join(JSON.stringify(variableValue).replace(/"/g, '\\"'))
} else {
returnVal = returnVal.split(path).join(variableValue)
}
})
return returnVal
}
@@ -533,7 +653,18 @@ export const isStartNodeDependOnInput = (startingNodes: IReactFlowNode[], nodes:
}
const whitelistNodeNames = ['vectorStoreToDocument', 'autoGPT', 'chatPromptTemplate', 'promptTemplate'] //If these nodes are found, chatflow cannot be reused
for (const node of nodes) {
if (whitelistNodeNames.includes(node.data.name)) return true
if (node.data.name === 'chatPromptTemplate' || node.data.name === 'promptTemplate') {
let promptValues: ICommonObject = {}
const promptValuesRaw = node.data.inputs?.promptValues
if (promptValuesRaw) {
try {
promptValues = typeof promptValuesRaw === 'object' ? promptValuesRaw : JSON.parse(promptValuesRaw)
} catch (exception) {
console.error(exception)
}
}
if (getAllValuesFromJson(promptValues).includes(`{{${QUESTION_VAR_PREFIX}}}`)) return true
} else if (whitelistNodeNames.includes(node.data.name)) return true
}
return false
}
@@ -673,6 +804,7 @@ export const findAvailableConfigs = (reactFlowNodes: IReactFlowNode[], component
/**
* Check to see if flow valid for stream
* TODO: perform check from component level. i.e: set streaming on component, and check here
* @param {IReactFlowNode[]} reactFlowNodes
* @param {INodeData} endingNodeData
* @returns {boolean}
@@ -686,7 +818,8 @@ export const isFlowValidForStream = (reactFlowNodes: IReactFlowNode[], endingNod
'chatAnthropic',
'chatAnthropic_LlamaIndex',
'chatOllama',
'awsChatBedrock'
'awsChatBedrock',
'chatMistralAI'
],
LLMs: ['azureOpenAI', 'openAI', 'ollama']
}
@@ -711,7 +844,7 @@ export const isFlowValidForStream = (reactFlowNodes: IReactFlowNode[], endingNod
const whitelistAgents = ['openAIFunctionAgent', 'csvAgent', 'airtableAgent', 'conversationalRetrievalAgent']
isValidChainOrAgent = whitelistAgents.includes(endingNodeData.name)
} else if (endingNodeData.category === 'Engine') {
const whitelistEngine = ['contextChatEngine', 'simpleChatEngine']
const whitelistEngine = ['contextChatEngine', 'simpleChatEngine', 'queryEngine']
isValidChainOrAgent = whitelistEngine.includes(endingNodeData.name)
}
@@ -727,16 +860,6 @@ export const isFlowValidForStream = (reactFlowNodes: IReactFlowNode[], endingNod
return isChatOrLLMsExist && isValidChainOrAgent && !isOutputParserExist
}
/**
* Returns the path of encryption key
* @returns {string}
*/
export const getEncryptionKeyPath = (): string => {
return process.env.SECRETKEY_PATH
? path.join(process.env.SECRETKEY_PATH, 'encryption.key')
: path.join(__dirname, '..', '..', 'encryption.key')
}
/**
* Generate an encryption key
* @returns {string}
@@ -757,7 +880,10 @@ export const getEncryptionKey = async (): Promise<string> => {
return await fs.promises.readFile(getEncryptionKeyPath(), 'utf8')
} catch (error) {
const encryptKey = generateEncryptKey()
await fs.promises.writeFile(getEncryptionKeyPath(), encryptKey)
const defaultLocation = process.env.SECRETKEY_PATH
? path.join(process.env.SECRETKEY_PATH, 'encryption.key')
: path.join(getUserHome(), '.flowise', 'encryption.key')
await fs.promises.writeFile(defaultLocation, encryptKey)
return encryptKey
}
}
@@ -845,21 +971,43 @@ export const redactCredentialWithPasswordType = (
}
/**
* Replace sessionId with new chatId
* Ex: after clear chat history, use the new chatId as sessionId
* Get sessionId
* Hierarchy of sessionId (top down)
* API/Embed:
* (1) Provided in API body - incomingInput.overrideConfig: { sessionId: 'abc' }
* (2) Provided in API body - incomingInput.chatId
*
* API/Embed + UI:
* (3) Hard-coded sessionId in UI
* (4) Not specified on UI nor API, default to chatId
* @param {any} instance
* @param {IncomingInput} incomingInput
* @param {string} chatId
*/
export const replaceMemorySessionId = (instance: any, chatId: string): string | undefined => {
if (instance.memory && instance.memory.isSessionIdUsingChatMessageId && chatId) {
instance.memory.sessionId = chatId
instance.memory.chatHistory.sessionId = chatId
export const getMemorySessionId = (
memoryNode: IReactFlowNode,
incomingInput: IncomingInput,
chatId: string,
isInternal: boolean
): string | undefined => {
if (!isInternal) {
// Provided in API body - incomingInput.overrideConfig: { sessionId: 'abc' }
if (incomingInput.overrideConfig?.sessionId) {
return incomingInput.overrideConfig?.sessionId
}
// Provided in API body - incomingInput.chatId
if (incomingInput.chatId) {
return incomingInput.chatId
}
}
if (instance.memory && instance.memory.sessionId) return instance.memory.sessionId
else if (instance.memory && instance.memory.chatHistory && instance.memory.chatHistory.sessionId)
return instance.memory.chatHistory.sessionId
return undefined
// Hard-coded sessionId in UI
if (memoryNode.data.inputs?.sessionId) {
return memoryNode.data.inputs.sessionId
}
// Default chatId
return chatId
}
/**
@@ -871,27 +1019,135 @@ export const replaceMemorySessionId = (instance: any, chatId: string): string |
* @param {any} logger
* @returns {string}
*/
export const replaceChatHistory = async (
export const getSessionChatHistory = async (
memoryNode: IReactFlowNode,
componentNodes: IComponentNodes,
incomingInput: IncomingInput,
appDataSource: DataSource,
databaseEntities: IDatabaseEntity,
logger: any
): Promise<IMessage[]> => {
const nodeInstanceFilePath = memoryNode.data.filePath as string
const nodeInstanceFilePath = componentNodes[memoryNode.data.name].filePath as string
const nodeModule = await import(nodeInstanceFilePath)
const newNodeInstance = new nodeModule.nodeClass()
// Replace memory's sessionId/chatId
if (incomingInput.overrideConfig?.sessionId && memoryNode.data.inputs) {
memoryNode.data.inputs.sessionId = incomingInput.overrideConfig.sessionId
} else if (incomingInput.chatId && memoryNode.data.inputs) {
memoryNode.data.inputs.sessionId = incomingInput.chatId
}
const initializedInstance = await newNodeInstance.init(memoryNode.data, '', {
chatId: incomingInput.chatId,
const initializedInstance: FlowiseMemory = await newNodeInstance.init(memoryNode.data, '', {
appDataSource,
databaseEntities,
logger
})
return await initializedInstance.getChatMessages()
return (await initializedInstance.getChatMessages()) as IMessage[]
}
/**
* Method that find memory that is connected within chatflow
* In a chatflow, there should only be 1 memory node
* @param {IReactFlowNode[]} nodes
* @param {IReactFlowEdge[]} edges
* @returns {string | undefined}
*/
export const findMemoryNode = (nodes: IReactFlowNode[], edges: IReactFlowEdge[]): IReactFlowNode | undefined => {
const memoryNodes = nodes.filter((node) => node.data.category === 'Memory')
const memoryNodeIds = memoryNodes.map((mem) => mem.data.id)
for (const edge of edges) {
if (memoryNodeIds.includes(edge.source)) {
const memoryNode = nodes.find((node) => node.data.id === edge.source)
return memoryNode
}
}
return undefined
}
/**
* Get all values from a JSON object
* @param {any} obj
* @returns {any[]}
*/
export const getAllValuesFromJson = (obj: any): any[] => {
const values: any[] = []
function extractValues(data: any) {
if (typeof data === 'object' && data !== null) {
if (Array.isArray(data)) {
for (const item of data) {
extractValues(item)
}
} else {
for (const key in data) {
extractValues(data[key])
}
}
} else {
values.push(data)
}
}
extractValues(obj)
return values
}
/**
* Get only essential flow data items for telemetry
* @param {IReactFlowNode[]} nodes
* @param {IReactFlowEdge[]} edges
*/
export const getTelemetryFlowObj = (nodes: IReactFlowNode[], edges: IReactFlowEdge[]) => {
const nodeData = nodes.map((node) => node.id)
const edgeData = edges.map((edge) => ({ source: edge.source, target: edge.target }))
return { nodes: nodeData, edges: edgeData }
}
/**
* Get user settings file
* TODO: move env variables to settings json file, easier configuration
*/
export const getUserSettingsFilePath = () => {
if (process.env.SECRETKEY_PATH) return path.join(process.env.SECRETKEY_PATH, 'settings.json')
const checkPaths = [path.join(getUserHome(), '.flowise', 'settings.json')]
for (const checkPath of checkPaths) {
if (fs.existsSync(checkPath)) {
return checkPath
}
}
return ''
}
/**
* Get app current version
*/
export const getAppVersion = async () => {
const getPackageJsonPath = (): string => {
const checkPaths = [
path.join(__dirname, '..', 'package.json'),
path.join(__dirname, '..', '..', 'package.json'),
path.join(__dirname, '..', '..', '..', 'package.json'),
path.join(__dirname, '..', '..', '..', '..', 'package.json'),
path.join(__dirname, '..', '..', '..', '..', '..', 'package.json')
]
for (const checkPath of checkPaths) {
if (fs.existsSync(checkPath)) {
return checkPath
}
}
return ''
}
const packagejsonPath = getPackageJsonPath()
if (!packagejsonPath) return ''
try {
const content = await fs.promises.readFile(packagejsonPath, 'utf8')
const parsedContent = JSON.parse(content)
return parsedContent.version
} catch (error) {
return ''
}
}
+52
View File
@@ -0,0 +1,52 @@
import { v4 as uuidv4 } from 'uuid'
import { PostHog } from 'posthog-node'
import path from 'path'
import fs from 'fs'
import { getUserHome, getUserSettingsFilePath } from '.'
export class Telemetry {
postHog?: PostHog
constructor() {
if (process.env.DISABLE_FLOWISE_TELEMETRY !== 'true') {
this.postHog = new PostHog('phc_jEDuFYnOnuXsws986TLWzuisbRjwFqTl9JL8tDMgqme')
} else {
this.postHog = undefined
}
}
async id(): Promise<string> {
try {
const settingsContent = await fs.promises.readFile(getUserSettingsFilePath(), 'utf8')
const settings = JSON.parse(settingsContent)
return settings.instanceId
} catch (error) {
const instanceId = uuidv4()
const settings = {
instanceId
}
const defaultLocation = process.env.SECRETKEY_PATH
? path.join(process.env.SECRETKEY_PATH, 'settings.json')
: path.join(getUserHome(), '.flowise', 'settings.json')
await fs.promises.writeFile(defaultLocation, JSON.stringify(settings, null, 2))
return instanceId
}
}
async sendTelemetry(event: string, properties = {}): Promise<void> {
if (this.postHog) {
const distinctId = await this.id()
this.postHog.capture({
event,
distinctId,
properties
})
}
}
async flush(): Promise<void> {
if (this.postHog) {
await this.postHog.shutdownAsync()
}
}
}