mirror of
https://github.com/farcasclaudiu/Flowise.git
synced 2026-06-29 01:01:11 +03:00
Merge branch 'main' of github.com:0xi4o/Flowise into feature/scrapped-links
This commit is contained in:
@@ -39,7 +39,8 @@ export default class Start extends Command {
|
||||
LANGCHAIN_TRACING_V2: Flags.string(),
|
||||
LANGCHAIN_ENDPOINT: Flags.string(),
|
||||
LANGCHAIN_API_KEY: Flags.string(),
|
||||
LANGCHAIN_PROJECT: Flags.string()
|
||||
LANGCHAIN_PROJECT: Flags.string(),
|
||||
DISABLE_FLOWISE_TELEMETRY: Flags.string()
|
||||
}
|
||||
|
||||
async stopProcess() {
|
||||
@@ -113,6 +114,9 @@ export default class Start extends Command {
|
||||
if (flags.LANGCHAIN_API_KEY) process.env.LANGCHAIN_API_KEY = flags.LANGCHAIN_API_KEY
|
||||
if (flags.LANGCHAIN_PROJECT) process.env.LANGCHAIN_PROJECT = flags.LANGCHAIN_PROJECT
|
||||
|
||||
// Telemetry
|
||||
if (flags.DISABLE_FLOWISE_TELEMETRY) process.env.DISABLE_FLOWISE_TELEMETRY = flags.DISABLE_FLOWISE_TELEMETRY
|
||||
|
||||
await (async () => {
|
||||
try {
|
||||
logger.info('Starting Flowise...')
|
||||
|
||||
@@ -45,7 +45,9 @@ import {
|
||||
getSessionChatHistory,
|
||||
getAllConnectedNodes,
|
||||
clearSessionMemory,
|
||||
findMemoryNode
|
||||
findMemoryNode,
|
||||
getTelemetryFlowObj,
|
||||
getAppVersion
|
||||
} from './utils'
|
||||
import { cloneDeep, omit, uniqWith, isEqual } from 'lodash'
|
||||
import { getDataSource } from './DataSource'
|
||||
@@ -64,6 +66,7 @@ import { sanitizeMiddleware } from './utils/XSS'
|
||||
import axios from 'axios'
|
||||
import { Client } from 'langchainhub'
|
||||
import { parsePrompt } from './utils/hub'
|
||||
import { Telemetry } from './utils/telemetry'
|
||||
import { Variable } from './database/entities/Variable'
|
||||
|
||||
export class App {
|
||||
@@ -71,6 +74,7 @@ export class App {
|
||||
nodesPool: NodesPool
|
||||
chatflowPool: ChatflowPool
|
||||
cachePool: CachePool
|
||||
telemetry: Telemetry
|
||||
AppDataSource = getDataSource()
|
||||
|
||||
constructor() {
|
||||
@@ -105,6 +109,9 @@ export class App {
|
||||
|
||||
// Initialize cache pool
|
||||
this.cachePool = new CachePool()
|
||||
|
||||
// Initialize telemetry
|
||||
this.telemetry = new Telemetry()
|
||||
})
|
||||
.catch((err) => {
|
||||
logger.error('❌ [server]: Error during Data Source initialization:', err)
|
||||
@@ -294,7 +301,13 @@ export class App {
|
||||
const nodeModule = await import(nodeInstanceFilePath)
|
||||
const newNodeInstance = new nodeModule.nodeClass()
|
||||
|
||||
const returnData = await newNodeInstance.init(nodeData)
|
||||
const options: ICommonObject = {
|
||||
appDataSource: this.AppDataSource,
|
||||
databaseEntities,
|
||||
logger
|
||||
}
|
||||
|
||||
const returnData = await newNodeInstance.init(nodeData, '', options)
|
||||
const result = typeof returnData === 'string' ? handleEscapeCharacters(returnData, true) : returnData
|
||||
|
||||
return res.json(result)
|
||||
@@ -382,6 +395,12 @@ export class App {
|
||||
const chatflow = this.AppDataSource.getRepository(ChatFlow).create(newChatFlow)
|
||||
const results = await this.AppDataSource.getRepository(ChatFlow).save(chatflow)
|
||||
|
||||
await this.telemetry.sendTelemetry('chatflow_created', {
|
||||
version: await getAppVersion(),
|
||||
chatlowId: results.id,
|
||||
flowGraph: getTelemetryFlowObj(JSON.parse(results.flowData)?.nodes, JSON.parse(results.flowData)?.edges)
|
||||
})
|
||||
|
||||
return res.json(results)
|
||||
})
|
||||
|
||||
@@ -668,6 +687,12 @@ export class App {
|
||||
const tool = this.AppDataSource.getRepository(Tool).create(newTool)
|
||||
const results = await this.AppDataSource.getRepository(Tool).save(tool)
|
||||
|
||||
await this.telemetry.sendTelemetry('tool_created', {
|
||||
version: await getAppVersion(),
|
||||
toolId: results.id,
|
||||
toolName: results.name
|
||||
})
|
||||
|
||||
return res.json(results)
|
||||
})
|
||||
|
||||
@@ -874,6 +899,11 @@ export class App {
|
||||
const assistant = this.AppDataSource.getRepository(Assistant).create(newAssistant)
|
||||
const results = await this.AppDataSource.getRepository(Assistant).save(assistant)
|
||||
|
||||
await this.telemetry.sendTelemetry('assistant_created', {
|
||||
version: await getAppVersion(),
|
||||
assistantId: results.id
|
||||
})
|
||||
|
||||
return res.json(results)
|
||||
})
|
||||
|
||||
@@ -1461,6 +1491,11 @@ export class App {
|
||||
let chatId = incomingInput.chatId ?? ''
|
||||
let isUpsert = true
|
||||
|
||||
// Get session ID
|
||||
const memoryNode = findMemoryNode(nodes, edges)
|
||||
let sessionId = undefined
|
||||
if (memoryNode) sessionId = getMemorySessionId(memoryNode, incomingInput, chatId, isInternal)
|
||||
|
||||
const vsNodes = nodes.filter(
|
||||
(node) =>
|
||||
node.data.category === 'Vector Stores' &&
|
||||
@@ -1498,6 +1533,7 @@ export class App {
|
||||
incomingInput.question,
|
||||
chatHistory,
|
||||
chatId,
|
||||
sessionId ?? '',
|
||||
chatflowid,
|
||||
this.AppDataSource,
|
||||
incomingInput?.overrideConfig,
|
||||
@@ -1509,6 +1545,15 @@ export class App {
|
||||
const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.data.id))
|
||||
|
||||
this.chatflowPool.add(chatflowid, undefined, startingNodes, incomingInput?.overrideConfig)
|
||||
|
||||
await this.telemetry.sendTelemetry('vector_upserted', {
|
||||
version: await getAppVersion(),
|
||||
chatlowId: chatflowid,
|
||||
type: isInternal ? chatType.INTERNAL : chatType.EXTERNAL,
|
||||
flowGraph: getTelemetryFlowObj(nodes, edges),
|
||||
stopNodeId
|
||||
})
|
||||
|
||||
return res.status(201).send('Successfully Upserted')
|
||||
} catch (e: any) {
|
||||
logger.error('[server]: Error:', e)
|
||||
@@ -1575,6 +1620,12 @@ export class App {
|
||||
const nodes = parsedFlowData.nodes
|
||||
const edges = parsedFlowData.edges
|
||||
|
||||
// Get session ID
|
||||
const memoryNode = findMemoryNode(nodes, edges)
|
||||
const memoryType = memoryNode?.data.label
|
||||
let sessionId = undefined
|
||||
if (memoryNode) sessionId = getMemorySessionId(memoryNode, incomingInput, chatId, isInternal)
|
||||
|
||||
/* Reuse the flow without having to rebuild (to avoid duplicated upsert, recomputation, reinitialization of memory) when all these conditions met:
|
||||
* - Node Data already exists in pool
|
||||
* - Still in sync (i.e the flow has not been modified since)
|
||||
@@ -1684,6 +1735,7 @@ export class App {
|
||||
incomingInput.question,
|
||||
chatHistory,
|
||||
chatId,
|
||||
sessionId ?? '',
|
||||
chatflowid,
|
||||
this.AppDataSource,
|
||||
incomingInput?.overrideConfig,
|
||||
@@ -1713,12 +1765,6 @@ export class App {
|
||||
|
||||
logger.debug(`[server]: Running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`)
|
||||
|
||||
const memoryNode = findMemoryNode(nodes, edges)
|
||||
const memoryType = memoryNode?.data.label
|
||||
|
||||
let sessionId = undefined
|
||||
if (memoryNode) sessionId = getMemorySessionId(memoryNode, incomingInput, chatId, isInternal)
|
||||
|
||||
const nodeInstanceFilePath = this.nodesPool.componentNodes[nodeToExecuteData.name].filePath as string
|
||||
const nodeModule = await import(nodeInstanceFilePath)
|
||||
const nodeInstance = new nodeModule.nodeClass({ sessionId })
|
||||
@@ -1781,12 +1827,22 @@ export class App {
|
||||
if (result?.sourceDocuments) apiMessage.sourceDocuments = JSON.stringify(result.sourceDocuments)
|
||||
if (result?.usedTools) apiMessage.usedTools = JSON.stringify(result.usedTools)
|
||||
if (result?.fileAnnotations) apiMessage.fileAnnotations = JSON.stringify(result.fileAnnotations)
|
||||
await this.addChatMessage(apiMessage)
|
||||
const chatMessage = await this.addChatMessage(apiMessage)
|
||||
result.chatMessageId = chatMessage.id
|
||||
|
||||
logger.debug(`[server]: Finished running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`)
|
||||
await this.telemetry.sendTelemetry('prediction_sent', {
|
||||
version: await getAppVersion(),
|
||||
chatlowId: chatflowid,
|
||||
chatId,
|
||||
type: isInternal ? chatType.INTERNAL : chatType.EXTERNAL,
|
||||
flowGraph: getTelemetryFlowObj(nodes, edges)
|
||||
})
|
||||
|
||||
// Only return ChatId when its Internal OR incoming input has ChatId, to avoid confusion when calling API
|
||||
if (incomingInput.chatId || isInternal) result.chatId = chatId
|
||||
// Prepare response
|
||||
result.chatId = chatId
|
||||
if (sessionId) result.sessionId = sessionId
|
||||
if (memoryType) result.memoryType = memoryType
|
||||
|
||||
return res.json(result)
|
||||
} catch (e: any) {
|
||||
@@ -1798,6 +1854,7 @@ export class App {
|
||||
async stopApp() {
|
||||
try {
|
||||
const removePromises: any[] = []
|
||||
removePromises.push(this.telemetry.flush())
|
||||
await Promise.all(removePromises)
|
||||
} catch (e) {
|
||||
logger.error(`❌[server]: Flowise Server shut down error: ${e}`)
|
||||
|
||||
@@ -273,6 +273,7 @@ export const buildLangchain = async (
|
||||
question: string,
|
||||
chatHistory: IMessage[],
|
||||
chatId: string,
|
||||
sessionId: string,
|
||||
chatflowid: string,
|
||||
appDataSource: DataSource,
|
||||
overrideConfig?: ICommonObject,
|
||||
@@ -317,6 +318,7 @@ export const buildLangchain = async (
|
||||
logger.debug(`[server]: Upserting ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
|
||||
await newNodeInstance.vectorStoreMethods!['upsert']!.call(newNodeInstance, reactFlowNodeData, {
|
||||
chatId,
|
||||
sessionId,
|
||||
chatflowid,
|
||||
chatHistory,
|
||||
logger,
|
||||
@@ -331,6 +333,7 @@ export const buildLangchain = async (
|
||||
logger.debug(`[server]: Initializing ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
|
||||
let outputResult = await newNodeInstance.init(reactFlowNodeData, question, {
|
||||
chatId,
|
||||
sessionId,
|
||||
chatflowid,
|
||||
chatHistory,
|
||||
logger,
|
||||
@@ -1079,3 +1082,60 @@ export const getAllValuesFromJson = (obj: any): any[] => {
|
||||
extractValues(obj)
|
||||
return values
|
||||
}
|
||||
|
||||
/**
|
||||
* Get only essential flow data items for telemetry
|
||||
* @param {IReactFlowNode[]} nodes
|
||||
* @param {IReactFlowEdge[]} edges
|
||||
*/
|
||||
export const getTelemetryFlowObj = (nodes: IReactFlowNode[], edges: IReactFlowEdge[]) => {
|
||||
const nodeData = nodes.map((node) => node.id)
|
||||
const edgeData = edges.map((edge) => ({ source: edge.source, target: edge.target }))
|
||||
return { nodes: nodeData, edges: edgeData }
|
||||
}
|
||||
|
||||
/**
|
||||
* Get user settings file
|
||||
* TODO: move env variables to settings json file, easier configuration
|
||||
*/
|
||||
export const getUserSettingsFilePath = () => {
|
||||
if (process.env.SECRETKEY_PATH) return path.join(process.env.SECRETKEY_PATH, 'settings.json')
|
||||
const checkPaths = [path.join(getUserHome(), '.flowise', 'settings.json')]
|
||||
for (const checkPath of checkPaths) {
|
||||
if (fs.existsSync(checkPath)) {
|
||||
return checkPath
|
||||
}
|
||||
}
|
||||
return ''
|
||||
}
|
||||
|
||||
/**
|
||||
* Get app current version
|
||||
*/
|
||||
export const getAppVersion = async () => {
|
||||
const getPackageJsonPath = (): string => {
|
||||
const checkPaths = [
|
||||
path.join(__dirname, '..', 'package.json'),
|
||||
path.join(__dirname, '..', '..', 'package.json'),
|
||||
path.join(__dirname, '..', '..', '..', 'package.json'),
|
||||
path.join(__dirname, '..', '..', '..', '..', 'package.json'),
|
||||
path.join(__dirname, '..', '..', '..', '..', '..', 'package.json')
|
||||
]
|
||||
for (const checkPath of checkPaths) {
|
||||
if (fs.existsSync(checkPath)) {
|
||||
return checkPath
|
||||
}
|
||||
}
|
||||
return ''
|
||||
}
|
||||
|
||||
const packagejsonPath = getPackageJsonPath()
|
||||
if (!packagejsonPath) return ''
|
||||
try {
|
||||
const content = await fs.promises.readFile(packagejsonPath, 'utf8')
|
||||
const parsedContent = JSON.parse(content)
|
||||
return parsedContent.version
|
||||
} catch (error) {
|
||||
return ''
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,52 @@
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
import { PostHog } from 'posthog-node'
|
||||
import path from 'path'
|
||||
import fs from 'fs'
|
||||
import { getUserHome, getUserSettingsFilePath } from '.'
|
||||
|
||||
export class Telemetry {
|
||||
postHog?: PostHog
|
||||
|
||||
constructor() {
|
||||
if (process.env.DISABLE_FLOWISE_TELEMETRY !== 'true') {
|
||||
this.postHog = new PostHog('phc_jEDuFYnOnuXsws986TLWzuisbRjwFqTl9JL8tDMgqme')
|
||||
} else {
|
||||
this.postHog = undefined
|
||||
}
|
||||
}
|
||||
|
||||
async id(): Promise<string> {
|
||||
try {
|
||||
const settingsContent = await fs.promises.readFile(getUserSettingsFilePath(), 'utf8')
|
||||
const settings = JSON.parse(settingsContent)
|
||||
return settings.instanceId
|
||||
} catch (error) {
|
||||
const instanceId = uuidv4()
|
||||
const settings = {
|
||||
instanceId
|
||||
}
|
||||
const defaultLocation = process.env.SECRETKEY_PATH
|
||||
? path.join(process.env.SECRETKEY_PATH, 'settings.json')
|
||||
: path.join(getUserHome(), '.flowise', 'settings.json')
|
||||
await fs.promises.writeFile(defaultLocation, JSON.stringify(settings, null, 2))
|
||||
return instanceId
|
||||
}
|
||||
}
|
||||
|
||||
async sendTelemetry(event: string, properties = {}): Promise<void> {
|
||||
if (this.postHog) {
|
||||
const distinctId = await this.id()
|
||||
this.postHog.capture({
|
||||
event,
|
||||
distinctId,
|
||||
properties
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
async flush(): Promise<void> {
|
||||
if (this.postHog) {
|
||||
await this.postHog.shutdownAsync()
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user