Merge branch 'main' into FEATURE/Vision

# Conflicts:
#	packages/components/nodes/chains/ConversationChain/ConversationChain.ts
#	packages/server/src/index.ts
#	packages/server/src/utils/index.ts
This commit is contained in:
Henry
2024-02-02 02:54:06 +00:00
136 changed files with 5054 additions and 2019 deletions
+22 -4
View File
@@ -1,5 +1,6 @@
import 'reflect-metadata'
import path from 'path'
import * as fs from 'fs'
import { DataSource } from 'typeorm'
import { getUserHome } from './utils'
import { entities } from './database/entities'
@@ -11,9 +12,13 @@ let appDataSource: DataSource
export const init = async (): Promise<void> => {
let homePath
let flowisePath = path.join(getUserHome(), '.flowise')
if (!fs.existsSync(flowisePath)) {
fs.mkdirSync(flowisePath)
}
switch (process.env.DATABASE_TYPE) {
case 'sqlite':
homePath = process.env.DATABASE_PATH ?? path.join(getUserHome(), '.flowise')
homePath = process.env.DATABASE_PATH ?? flowisePath
appDataSource = new DataSource({
type: 'sqlite',
database: path.resolve(homePath, 'database.sqlite'),
@@ -35,7 +40,8 @@ export const init = async (): Promise<void> => {
synchronize: false,
migrationsRun: false,
entities: Object.values(entities),
migrations: mysqlMigrations
migrations: mysqlMigrations,
ssl: getDatabaseSSLFromEnv()
})
break
case 'postgres':
@@ -46,7 +52,7 @@ export const init = async (): Promise<void> => {
username: process.env.DATABASE_USER,
password: process.env.DATABASE_PASSWORD,
database: process.env.DATABASE_NAME,
ssl: process.env.DATABASE_SSL === 'true',
ssl: getDatabaseSSLFromEnv(),
synchronize: false,
migrationsRun: false,
entities: Object.values(entities),
@@ -54,7 +60,7 @@ export const init = async (): Promise<void> => {
})
break
default:
homePath = process.env.DATABASE_PATH ?? path.join(getUserHome(), '.flowise')
homePath = process.env.DATABASE_PATH ?? flowisePath
appDataSource = new DataSource({
type: 'sqlite',
database: path.resolve(homePath, 'database.sqlite'),
@@ -73,3 +79,15 @@ export function getDataSource(): DataSource {
}
return appDataSource
}
const getDatabaseSSLFromEnv = () => {
if (process.env.DATABASE_SSL_KEY_BASE64) {
return {
rejectUnauthorized: false,
ca: Buffer.from(process.env.DATABASE_SSL_KEY_BASE64, 'base64')
}
} else if (process.env.DATABASE_SSL === 'true') {
return true
}
return undefined
}
+2 -2
View File
@@ -1,4 +1,4 @@
import { ICommonObject, INode, INodeData as INodeDataFromComponent, INodeParams } from 'flowise-components'
import { ICommonObject, IFileUpload, INode, INodeData as INodeDataFromComponent, INodeParams } from 'flowise-components'
export type MessageType = 'apiMessage' | 'userMessage'
@@ -177,7 +177,7 @@ export interface IncomingInput {
socketIOClientId?: string
chatId?: string
stopNodeId?: string
uploads?: string
uploads?: IFileUpload[]
}
export interface IActiveChatflows {
+11 -1
View File
@@ -19,6 +19,8 @@ export default class Start extends Command {
FLOWISE_USERNAME: Flags.string(),
FLOWISE_PASSWORD: Flags.string(),
PORT: Flags.string(),
CORS_ORIGINS: Flags.string(),
IFRAME_ORIGINS: Flags.string(),
DEBUG: Flags.string(),
APIKEY_PATH: Flags.string(),
SECRETKEY_PATH: Flags.string(),
@@ -36,10 +38,12 @@ export default class Start extends Command {
DATABASE_USER: Flags.string(),
DATABASE_PASSWORD: Flags.string(),
DATABASE_SSL: Flags.string(),
DATABASE_SSL_KEY_BASE64: Flags.string(),
LANGCHAIN_TRACING_V2: Flags.string(),
LANGCHAIN_ENDPOINT: Flags.string(),
LANGCHAIN_API_KEY: Flags.string(),
LANGCHAIN_PROJECT: Flags.string()
LANGCHAIN_PROJECT: Flags.string(),
DISABLE_FLOWISE_TELEMETRY: Flags.string()
}
async stopProcess() {
@@ -77,6 +81,8 @@ export default class Start extends Command {
const { flags } = await this.parse(Start)
if (flags.PORT) process.env.PORT = flags.PORT
if (flags.CORS_ORIGINS) process.env.CORS_ORIGINS = flags.CORS_ORIGINS
if (flags.IFRAME_ORIGINS) process.env.IFRAME_ORIGINS = flags.IFRAME_ORIGINS
if (flags.DEBUG) process.env.DEBUG = flags.DEBUG
if (flags.NUMBER_OF_PROXIES) process.env.NUMBER_OF_PROXIES = flags.NUMBER_OF_PROXIES
@@ -106,6 +112,7 @@ export default class Start extends Command {
if (flags.DATABASE_USER) process.env.DATABASE_USER = flags.DATABASE_USER
if (flags.DATABASE_PASSWORD) process.env.DATABASE_PASSWORD = flags.DATABASE_PASSWORD
if (flags.DATABASE_SSL) process.env.DATABASE_SSL = flags.DATABASE_SSL
if (flags.DATABASE_SSL_KEY_BASE64) process.env.DATABASE_SSL_KEY_BASE64 = flags.DATABASE_SSL_KEY_BASE64
// Langsmith tracing
if (flags.LANGCHAIN_TRACING_V2) process.env.LANGCHAIN_TRACING_V2 = flags.LANGCHAIN_TRACING_V2
@@ -113,6 +120,9 @@ export default class Start extends Command {
if (flags.LANGCHAIN_API_KEY) process.env.LANGCHAIN_API_KEY = flags.LANGCHAIN_API_KEY
if (flags.LANGCHAIN_PROJECT) process.env.LANGCHAIN_PROJECT = flags.LANGCHAIN_PROJECT
// Telemetry
if (flags.DISABLE_FLOWISE_TELEMETRY) process.env.DISABLE_FLOWISE_TELEMETRY = flags.DISABLE_FLOWISE_TELEMETRY
await (async () => {
try {
logger.info('Starting Flowise...')
+208 -78
View File
@@ -47,7 +47,9 @@ import {
getAllConnectedNodes,
clearSessionMemory,
findMemoryNode,
deleteFolderRecursive
deleteFolderRecursive,
getTelemetryFlowObj,
getAppVersion
} from './utils'
import { cloneDeep, omit, uniqWith, isEqual } from 'lodash'
import { getDataSource } from './DataSource'
@@ -66,14 +68,18 @@ import {
INodeParams,
handleEscapeCharacters,
convertSpeechToText,
xmlScrape,
webCrawl,
getStoragePath,
IFileUpload
} from 'flowise-components'
import { createRateLimiter, getRateLimiter, initializeRateLimiter } from './utils/rateLimit'
import { addAPIKey, compareKeys, deleteAPIKey, getApiKey, getAPIKeys, updateAPIKey } from './utils/apiKey'
import { sanitizeMiddleware } from './utils/XSS'
import { sanitizeMiddleware, getCorsOptions, getAllowedIframeOrigins } from './utils/XSS'
import axios from 'axios'
import { Client } from 'langchainhub'
import { parsePrompt } from './utils/hub'
import { Telemetry } from './utils/telemetry'
import { Variable } from './database/entities/Variable'
export class App {
@@ -81,6 +87,7 @@ export class App {
nodesPool: NodesPool
chatflowPool: ChatflowPool
cachePool: CachePool
telemetry: Telemetry
AppDataSource = getDataSource()
constructor() {
@@ -115,6 +122,9 @@ export class App {
// Initialize cache pool
this.cachePool = new CachePool()
// Initialize telemetry
this.telemetry = new Telemetry()
})
.catch((err) => {
logger.error('❌ [server]: Error during Data Source initialization:', err)
@@ -129,8 +139,20 @@ export class App {
if (process.env.NUMBER_OF_PROXIES && parseInt(process.env.NUMBER_OF_PROXIES) > 0)
this.app.set('trust proxy', parseInt(process.env.NUMBER_OF_PROXIES))
// Allow access from *
this.app.use(cors())
// Allow access from specified domains
this.app.use(cors(getCorsOptions()))
// Allow embedding from specified domains.
this.app.use((req, res, next) => {
const allowedOrigins = getAllowedIframeOrigins()
if (allowedOrigins == '*') {
next()
} else {
const csp = `frame-ancestors ${allowedOrigins}`
res.setHeader('Content-Security-Policy', csp)
next()
}
})
// Switch off the default 'X-Powered-By: Express' header
this.app.disable('x-powered-by')
@@ -306,7 +328,13 @@ export class App {
const nodeModule = await import(nodeInstanceFilePath)
const newNodeInstance = new nodeModule.nodeClass()
const returnData = await newNodeInstance.init(nodeData)
const options: ICommonObject = {
appDataSource: this.AppDataSource,
databaseEntities,
logger
}
const returnData = await newNodeInstance.init(nodeData, '', options)
const result = typeof returnData === 'string' ? handleEscapeCharacters(returnData, true) : returnData
return res.json(result)
@@ -394,6 +422,12 @@ export class App {
const chatflow = this.AppDataSource.getRepository(ChatFlow).create(newChatFlow)
const results = await this.AppDataSource.getRepository(ChatFlow).save(chatflow)
await this.telemetry.sendTelemetry('chatflow_created', {
version: await getAppVersion(),
chatlowId: results.id,
flowGraph: getTelemetryFlowObj(JSON.parse(results.flowData)?.nodes, JSON.parse(results.flowData)?.edges)
})
return res.json(results)
})
@@ -433,11 +467,11 @@ export class App {
const results = await this.AppDataSource.getRepository(ChatFlow).delete({ id: req.params.id })
try {
/* Delete all multimodal uploads corresponding to this chatflow */
const directory = path.join(getUserHome(), '.flowise', 'gptvision', req.params.id)
// Delete all uploads corresponding to this chatflow
const directory = path.join(getStoragePath(), req.params.id)
deleteFolderRecursive(directory)
} catch (e) {
logger.error(`[server]: Error deleting multimodal uploads: ${e}`)
logger.error(`[server]: Error deleting file storage for chatflow ${req.params.id}: ${e}`)
}
return res.json(results)
@@ -463,18 +497,25 @@ export class App {
const endingNodes = nodes.filter((nd) => endingNodeIds.includes(nd.id))
let isStreaming = false
let isEndingNodeExists = endingNodes.find((node) => node.data?.outputs?.output === 'EndingNode')
for (const endingNode of endingNodes) {
const endingNodeData = endingNode.data
if (!endingNodeData) return res.status(500).send(`Ending node ${endingNode.id} data not found`)
if (endingNodeData && endingNodeData.category !== 'Chains' && endingNodeData.category !== 'Agents') {
return res.status(500).send(`Ending node must be either a Chain or Agent`)
const isEndingNode = endingNodeData?.outputs?.output === 'EndingNode'
if (!isEndingNode) {
if (endingNodeData && endingNodeData.category !== 'Chains' && endingNodeData.category !== 'Agents') {
return res.status(500).send(`Ending node must be either a Chain or Agent`)
}
}
isStreaming = isFlowValidForStream(nodes, endingNodeData)
isStreaming = isEndingNode ? false : isFlowValidForStream(nodes, endingNodeData)
}
const obj = { isStreaming }
// Once custom function ending node exists, flow is always unavailable to stream
const obj = { isStreaming: isEndingNodeExists ? false : isStreaming }
return res.json(obj)
})
@@ -490,43 +531,55 @@ export class App {
try {
const flowObj = JSON.parse(chatflow.flowData)
const allowances: IUploadFileSizeAndTypes[] = []
let allowSpeechToText = false
const imgUploadSizeAndTypes: IUploadFileSizeAndTypes[] = []
let isSpeechToTextEnabled = false
if (chatflow.speechToText) {
const speechToTextProviders = JSON.parse(chatflow.speechToText)
for (const provider in speechToTextProviders) {
const providerObj = speechToTextProviders[provider]
if (providerObj.status) {
allowSpeechToText = true
isSpeechToTextEnabled = true
break
}
}
}
let allowImageUploads = false
flowObj.nodes.forEach((node: IReactFlowNode) => {
if (uploadProcessingNodes.indexOf(node.data.name) > -1) {
logger.debug(`[server]: Found Eligible Node ${node.data.type}, Allowing Uploads.`)
let isImageUploadAllowed = false
const nodes: IReactFlowNode[] = flowObj.nodes
// there could be multiple components allowing uploads, so we check if it's already added
/*
* Condition for isImageUploadAllowed
* 1.) one of the uploadAllowedNodes exists
* 2.) one of the uploadProcessingNodes exists + allowImageUploads is ON
*/
if (!nodes.some((node) => uploadAllowedNodes.includes(node.data.name))) {
return res.json({
isSpeechToTextEnabled,
isImageUploadAllowed: false,
imgUploadSizeAndTypes
})
}
nodes.forEach((node: IReactFlowNode) => {
if (uploadProcessingNodes.indexOf(node.data.name) > -1) {
// TODO: for now the maxUploadSize is hardcoded to 5MB, we need to add it to the node properties
node.data.inputParams.map((param: INodeParams) => {
if (param.name === 'allowImageUploads' && node.data.inputs?.['allowImageUploads']) {
allowances.push({
imgUploadSizeAndTypes.push({
fileTypes: 'image/gif;image/jpeg;image/png;image/webp;'.split(';'),
maxUploadSize: 5
})
isImageUploadAllowed = true
}
})
} else if (uploadAllowedNodes.indexOf(node.data.name) > -1 && !allowImageUploads) {
allowImageUploads = true
}
})
return res.json({
allowSpeechToText: allowSpeechToText,
isUploadAllowed: allowImageUploads,
uploadFileSizeAndTypes: allowances
isSpeechToTextEnabled,
isImageUploadAllowed,
imgUploadSizeAndTypes
})
} catch (e) {
return res.status(500).send(e)
@@ -628,12 +681,14 @@ export class App {
if (sessionId) deleteOptions.sessionId = sessionId
if (chatType) deleteOptions.chatType = chatType
try {
/* Delete all multimodal uploads corresponding to this chatflow */
const directory = path.join(getUserHome(), '.flowise', 'gptvision', chatflowid)
deleteFolderRecursive(directory)
} catch (e) {
logger.error(`[server]: Error deleting multimodal uploads: ${e}`)
// Delete all uploads corresponding to this chatflow/chatId
if (chatId) {
try {
const directory = path.join(getStoragePath(), chatflowid, chatId)
deleteFolderRecursive(directory)
} catch (e) {
logger.error(`[server]: Error deleting file storage for chatflow ${chatflowid}, chatId ${chatId}: ${e}`)
}
}
const results = await this.AppDataSource.getRepository(ChatMessage).delete(deleteOptions)
@@ -752,6 +807,12 @@ export class App {
const tool = this.AppDataSource.getRepository(Tool).create(newTool)
const results = await this.AppDataSource.getRepository(Tool).save(tool)
await this.telemetry.sendTelemetry('tool_created', {
version: await getAppVersion(),
toolId: results.id,
toolName: results.name
})
return res.json(results)
})
@@ -958,6 +1019,11 @@ export class App {
const assistant = this.AppDataSource.getRepository(Assistant).create(newAssistant)
const results = await this.AppDataSource.getRepository(Assistant).save(assistant)
await this.telemetry.sendTelemetry('assistant_created', {
version: await getAppVersion(),
assistantId: results.id
})
return res.json(results)
})
@@ -1121,25 +1187,38 @@ export class App {
if (filePath.includes('..')) return res.status(500).send(`Invalid file path`)
//only return from the .flowise openai-assistant folder
if (!(filePath.includes('.flowise') && filePath.includes('openai-assistant'))) return res.status(500).send(`Invalid file path`)
res.setHeader('Content-Disposition', 'attachment; filename=' + path.basename(filePath))
streamFileToUser(res, filePath)
if (fs.existsSync(filePath)) {
res.setHeader('Content-Disposition', 'attachment; filename=' + path.basename(filePath))
streamFileToUser(res, filePath)
} else {
return res.status(404).send(`File ${req.body.fileName} not found`)
}
})
// stream uploaded image
this.app.get('/api/v1/get-upload-file/:id', async (req: Request, res: Response) => {
if (!req.params.id || !req.query.chatId) {
this.app.get('/api/v1/get-upload-file', async (req: Request, res: Response) => {
if (!req.query.chatflowId || !req.query.chatId || !req.query.fileName) {
return res.status(500).send(`Invalid file path`)
}
const filePath = path.join(getUserHome(), '.flowise', 'gptvision', req.query.chatId as string, req.params.id)
const chatflowId = req.query.chatflowId as string
const chatId = req.query.chatId as string
const fileName = req.query.fileName as string
const filePath = path.join(getStoragePath(), chatflowId, chatId, fileName)
//raise error if file path is not absolute
if (!path.isAbsolute(filePath)) return res.status(500).send(`Invalid file path`)
//raise error if file path contains '..'
if (filePath.includes('..')) return res.status(500).send(`Invalid file path`)
//only return from the .flowise gptvision folder
if (!(filePath.includes('.flowise') && filePath.includes('gptvision') && filePath.includes(req.query.chatId as string)))
return res.status(500).send(`Invalid file path`)
res.setHeader('Content-Disposition', 'attachment; filename=' + path.basename(filePath))
streamFileToUser(res, filePath)
//only return from the storage folder
if (!filePath.startsWith(getStoragePath())) return res.status(500).send(`Invalid file path`)
if (fs.existsSync(filePath)) {
res.setHeader('Content-Disposition', 'attachment; filename=' + path.basename(filePath))
streamFileToUser(res, filePath)
} else {
return res.status(404).send(`File ${fileName} not found`)
}
})
// ----------------------------------------
@@ -1192,6 +1271,19 @@ export class App {
}
})
// ----------------------------------------
// Scraper
// ----------------------------------------
this.app.get('/api/v1/fetch-links', async (req: Request, res: Response) => {
const url = decodeURIComponent(req.query.url as string)
const relativeLinksMethod = req.query.relativeLinksMethod as string
if (process.env.DEBUG === 'true') console.info(`Start ${relativeLinksMethod}`)
const links: string[] = relativeLinksMethod === 'webCrawl' ? await webCrawl(url, 0) : await xmlScrape(url, 0)
res.json({ status: 'OK', links })
})
// ----------------------------------------
// Upsert
// ----------------------------------------
@@ -1553,6 +1645,11 @@ export class App {
let chatId = incomingInput.chatId ?? ''
let isUpsert = true
// Get session ID
const memoryNode = findMemoryNode(nodes, edges)
let sessionId = undefined
if (memoryNode) sessionId = getMemorySessionId(memoryNode, incomingInput, chatId, isInternal)
const vsNodes = nodes.filter(
(node) =>
node.data.category === 'Vector Stores' &&
@@ -1590,6 +1687,7 @@ export class App {
incomingInput.question,
chatHistory,
chatId,
sessionId ?? '',
chatflowid,
this.AppDataSource,
incomingInput?.overrideConfig,
@@ -1601,6 +1699,15 @@ export class App {
const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.data.id))
this.chatflowPool.add(chatflowid, undefined, startingNodes, incomingInput?.overrideConfig)
await this.telemetry.sendTelemetry('vector_upserted', {
version: await getAppVersion(),
chatlowId: chatflowid,
type: isInternal ? chatType.INTERNAL : chatType.EXTERNAL,
flowGraph: getTelemetryFlowObj(nodes, edges),
stopNodeId
})
return res.status(201).send('Successfully Upserted')
} catch (e: any) {
logger.error('[server]: Error:', e)
@@ -1636,29 +1743,30 @@ export class App {
if (!isKeyValidated) return res.status(401).send('Unauthorized')
}
let fileUploads: IFileUpload[] = []
if (incomingInput.uploads) {
// @ts-ignore
const uploads = incomingInput.uploads as IFileUpload[]
for (const upload of uploads) {
if (upload.type === 'file' || upload.type === 'audio') {
fileUploads = incomingInput.uploads
for (let i = 0; i < fileUploads.length; i += 1) {
const upload = fileUploads[i]
if ((upload.type === 'file' || upload.type === 'audio') && upload.data) {
const filename = upload.name
const dir = path.join(getUserHome(), '.flowise', 'gptvision', chatflowid)
const dir = path.join(getStoragePath(), chatflowid, chatId)
if (!fs.existsSync(dir)) {
fs.mkdirSync(dir, { recursive: true })
}
const filePath = path.join(dir, filename)
const splitDataURI = upload.data.split(',')
const bf = Buffer.from(splitDataURI.pop() || '', 'base64')
//writes data to a file, replacing the file if it already exists.
fs.writeFileSync(filePath, bf)
// don't need to store the file contents in chatmessage, just the filename and chatId
upload.data = chatflowid
// Omit upload.data since we don't store the content in database
upload.type = 'stored-file'
fileUploads[i] = omit(upload, ['data'])
}
// Run Speech to Text conversion
if (upload.mime === 'audio/webm' && incomingInput.uploads?.length === 1) {
//speechToText
let speechToTextConfig: any = {}
let speechToTextConfig: ICommonObject = {}
if (chatflow.speechToText) {
const speechToTextProviders = JSON.parse(chatflow.speechToText)
for (const provider in speechToTextProviders) {
@@ -1672,6 +1780,8 @@ export class App {
}
if (speechToTextConfig) {
const options: ICommonObject = {
chatId,
chatflowid,
appDataSource: this.AppDataSource,
databaseEntities: databaseEntities
}
@@ -1715,6 +1825,12 @@ export class App {
const nodes = parsedFlowData.nodes
const edges = parsedFlowData.edges
// Get session ID
const memoryNode = findMemoryNode(nodes, edges)
const memoryType = memoryNode?.data.label
let sessionId = undefined
if (memoryNode) sessionId = getMemorySessionId(memoryNode, incomingInput, chatId, isInternal)
/* Reuse the flow without having to rebuild (to avoid duplicated upsert, recomputation, reinitialization of memory) when all these conditions met:
* - Node Data already exists in pool
* - Still in sync (i.e the flow has not been modified since)
@@ -1750,29 +1866,39 @@ export class App {
if (!endingNodeIds.length) return res.status(500).send(`Ending nodes not found`)
const endingNodes = nodes.filter((nd) => endingNodeIds.includes(nd.id))
let isEndingNodeExists = endingNodes.find((node) => node.data?.outputs?.output === 'EndingNode')
for (const endingNode of endingNodes) {
const endingNodeData = endingNode.data
if (!endingNodeData) return res.status(500).send(`Ending node ${endingNode.id} data not found`)
if (endingNodeData && endingNodeData.category !== 'Chains' && endingNodeData.category !== 'Agents') {
return res.status(500).send(`Ending node must be either a Chain or Agent`)
}
const isEndingNode = endingNodeData?.outputs?.output === 'EndingNode'
if (
endingNodeData.outputs &&
Object.keys(endingNodeData.outputs).length &&
!Object.values(endingNodeData.outputs).includes(endingNodeData.name)
) {
return res
.status(500)
.send(
`Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction`
)
if (!isEndingNode) {
if (endingNodeData && endingNodeData.category !== 'Chains' && endingNodeData.category !== 'Agents') {
return res.status(500).send(`Ending node must be either a Chain or Agent`)
}
if (
endingNodeData.outputs &&
Object.keys(endingNodeData.outputs).length &&
!Object.values(endingNodeData.outputs ?? {}).includes(endingNodeData.name)
) {
return res
.status(500)
.send(
`Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction`
)
}
}
isStreamValid = isFlowValidForStream(nodes, endingNodeData)
}
// Once custom function ending node exists, flow is always unavailable to stream
isStreamValid = isEndingNodeExists ? false : isStreamValid
let chatHistory: IMessage[] = incomingInput.history ?? []
// When {{chat_history}} is used in Prompt Template, fetch the chat conversations from memory node
@@ -1824,6 +1950,7 @@ export class App {
incomingInput.question,
chatHistory,
chatId,
sessionId ?? '',
chatflowid,
this.AppDataSource,
incomingInput?.overrideConfig,
@@ -1853,12 +1980,6 @@ export class App {
logger.debug(`[server]: Running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`)
const memoryNode = findMemoryNode(nodes, edges)
const memoryType = memoryNode?.data.label
let sessionId = undefined
if (memoryNode) sessionId = getMemorySessionId(memoryNode, incomingInput, chatId, isInternal)
const nodeInstanceFilePath = this.nodesPool.componentNodes[nodeToExecuteData.name].filePath as string
const nodeModule = await import(nodeInstanceFilePath)
const nodeInstance = new nodeModule.nodeClass({ sessionId })
@@ -1903,7 +2024,7 @@ export class App {
memoryType,
sessionId,
createdDate: userMessageDateTime,
fileUploads: incomingInput.uploads ? JSON.stringify(incomingInput.uploads) : ''
fileUploads: incomingInput.uploads ? JSON.stringify(fileUploads) : undefined
}
await this.addChatMessage(userMessage)
@@ -1924,12 +2045,22 @@ export class App {
if (result?.sourceDocuments) apiMessage.sourceDocuments = JSON.stringify(result.sourceDocuments)
if (result?.usedTools) apiMessage.usedTools = JSON.stringify(result.usedTools)
if (result?.fileAnnotations) apiMessage.fileAnnotations = JSON.stringify(result.fileAnnotations)
await this.addChatMessage(apiMessage)
const chatMessage = await this.addChatMessage(apiMessage)
logger.debug(`[server]: Finished running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`)
await this.telemetry.sendTelemetry('prediction_sent', {
version: await getAppVersion(),
chatlowId: chatflowid,
chatId,
type: isInternal ? chatType.INTERNAL : chatType.EXTERNAL,
flowGraph: getTelemetryFlowObj(nodes, edges)
})
// Only return ChatId when its Internal OR incoming input has ChatId, to avoid confusion when calling API
if (incomingInput.chatId || isInternal) result.chatId = chatId
// Prepare response
result.chatId = chatId
result.chatMessageId = chatMessage.id
if (sessionId) result.sessionId = sessionId
if (memoryType) result.memoryType = memoryType
return res.json(result)
} catch (e: any) {
@@ -1941,6 +2072,7 @@ export class App {
async stopApp() {
try {
const removePromises: any[] = []
removePromises.push(this.telemetry.flush())
await Promise.all(removePromises)
} catch (e) {
logger.error(`❌[server]: Flowise Server shut down error: ${e}`)
@@ -1961,9 +2093,7 @@ export async function start(): Promise<void> {
const server = http.createServer(serverApp.app)
const io = new Server(server, {
cors: {
origin: '*'
}
cors: getCorsOptions()
})
await serverApp.initDatabase()
+25
View File
@@ -18,3 +18,28 @@ export function sanitizeMiddleware(req: Request, res: Response, next: NextFuncti
}
next()
}
export function getAllowedCorsOrigins(): string {
// Expects FQDN separated by commas, otherwise nothing or * for all.
return process.env.CORS_ORIGINS ?? '*'
}
export function getCorsOptions(): any {
const corsOptions = {
origin: function (origin: string | undefined, callback: (err: Error | null, allow?: boolean) => void) {
const allowedOrigins = getAllowedCorsOrigins()
if (!origin || allowedOrigins == '*' || allowedOrigins.indexOf(origin) !== -1) {
callback(null, true)
} else {
callback(null, false)
}
}
}
return corsOptions
}
export function getAllowedIframeOrigins(): string {
// Expects FQDN separated by commas, otherwise nothing or * for all.
// Also CSP allowed values: self or none
return process.env.IFRAME_ORIGINS ?? '*'
}
+77 -6
View File
@@ -273,6 +273,7 @@ export const buildLangchain = async (
question: string,
chatHistory: IMessage[],
chatId: string,
sessionId: string,
chatflowid: string,
appDataSource: DataSource,
overrideConfig?: ICommonObject,
@@ -317,6 +318,7 @@ export const buildLangchain = async (
logger.debug(`[server]: Upserting ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
await newNodeInstance.vectorStoreMethods!['upsert']!.call(newNodeInstance, reactFlowNodeData, {
chatId,
sessionId,
chatflowid,
chatHistory,
logger,
@@ -331,6 +333,7 @@ export const buildLangchain = async (
logger.debug(`[server]: Initializing ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
let outputResult = await newNodeInstance.init(reactFlowNodeData, question, {
chatId,
sessionId,
chatflowid,
chatHistory,
logger,
@@ -607,28 +610,35 @@ export const resolveVariables = (
export const replaceInputsWithConfig = (flowNodeData: INodeData, overrideConfig: ICommonObject) => {
const types = 'inputs'
const getParamValues = (paramsObj: ICommonObject) => {
const getParamValues = (inputsObj: ICommonObject) => {
for (const config in overrideConfig) {
// If overrideConfig[key] is object
if (overrideConfig[config] && typeof overrideConfig[config] === 'object') {
const nodeIds = Object.keys(overrideConfig[config])
if (nodeIds.includes(flowNodeData.id)) {
paramsObj[config] = overrideConfig[config][flowNodeData.id]
inputsObj[config] = overrideConfig[config][flowNodeData.id]
continue
} else if (nodeIds.some((nodeId) => nodeId.includes(flowNodeData.name))) {
/*
* "systemMessagePrompt": {
* "chatPromptTemplate_0": "You are an assistant" <---- continue for loop if current node is chatPromptTemplate_1
* }
*/
continue
}
}
let paramValue = overrideConfig[config] ?? paramsObj[config]
let paramValue = overrideConfig[config] ?? inputsObj[config]
// Check if boolean
if (paramValue === 'true') paramValue = true
else if (paramValue === 'false') paramValue = false
paramsObj[config] = paramValue
inputsObj[config] = paramValue
}
}
const paramsObj = flowNodeData[types] ?? {}
const inputsObj = flowNodeData[types] ?? {}
getParamValues(paramsObj)
getParamValues(inputsObj)
return flowNodeData
}
@@ -1079,6 +1089,10 @@ export const getAllValuesFromJson = (obj: any): any[] => {
return values
}
/**
* Delete file & folder recursively
* @param {string} directory
*/
export const deleteFolderRecursive = (directory: string) => {
if (fs.existsSync(directory)) {
fs.readdir(directory, (error, files) => {
@@ -1102,3 +1116,60 @@ export const deleteFolderRecursive = (directory: string) => {
})
}
}
/**
* Get only essential flow data items for telemetry
* @param {IReactFlowNode[]} nodes
* @param {IReactFlowEdge[]} edges
*/
export const getTelemetryFlowObj = (nodes: IReactFlowNode[], edges: IReactFlowEdge[]) => {
const nodeData = nodes.map((node) => node.id)
const edgeData = edges.map((edge) => ({ source: edge.source, target: edge.target }))
return { nodes: nodeData, edges: edgeData }
}
/**
* Get user settings file
* TODO: move env variables to settings json file, easier configuration
*/
export const getUserSettingsFilePath = () => {
if (process.env.SECRETKEY_PATH) return path.join(process.env.SECRETKEY_PATH, 'settings.json')
const checkPaths = [path.join(getUserHome(), '.flowise', 'settings.json')]
for (const checkPath of checkPaths) {
if (fs.existsSync(checkPath)) {
return checkPath
}
}
return ''
}
/**
* Get app current version
*/
export const getAppVersion = async () => {
const getPackageJsonPath = (): string => {
const checkPaths = [
path.join(__dirname, '..', 'package.json'),
path.join(__dirname, '..', '..', 'package.json'),
path.join(__dirname, '..', '..', '..', 'package.json'),
path.join(__dirname, '..', '..', '..', '..', 'package.json'),
path.join(__dirname, '..', '..', '..', '..', '..', 'package.json')
]
for (const checkPath of checkPaths) {
if (fs.existsSync(checkPath)) {
return checkPath
}
}
return ''
}
const packagejsonPath = getPackageJsonPath()
if (!packagejsonPath) return ''
try {
const content = await fs.promises.readFile(packagejsonPath, 'utf8')
const parsedContent = JSON.parse(content)
return parsedContent.version
} catch (error) {
return ''
}
}
+52
View File
@@ -0,0 +1,52 @@
import { v4 as uuidv4 } from 'uuid'
import { PostHog } from 'posthog-node'
import path from 'path'
import fs from 'fs'
import { getUserHome, getUserSettingsFilePath } from '.'
export class Telemetry {
postHog?: PostHog
constructor() {
if (process.env.DISABLE_FLOWISE_TELEMETRY !== 'true') {
this.postHog = new PostHog('phc_jEDuFYnOnuXsws986TLWzuisbRjwFqTl9JL8tDMgqme')
} else {
this.postHog = undefined
}
}
async id(): Promise<string> {
try {
const settingsContent = await fs.promises.readFile(getUserSettingsFilePath(), 'utf8')
const settings = JSON.parse(settingsContent)
return settings.instanceId
} catch (error) {
const instanceId = uuidv4()
const settings = {
instanceId
}
const defaultLocation = process.env.SECRETKEY_PATH
? path.join(process.env.SECRETKEY_PATH, 'settings.json')
: path.join(getUserHome(), '.flowise', 'settings.json')
await fs.promises.writeFile(defaultLocation, JSON.stringify(settings, null, 2))
return instanceId
}
}
async sendTelemetry(event: string, properties = {}): Promise<void> {
if (this.postHog) {
const distinctId = await this.id()
this.postHog.capture({
event,
distinctId,
properties
})
}
}
async flush(): Promise<void> {
if (this.postHog) {
await this.postHog.shutdownAsync()
}
}
}