From 953e1468bbfb2e51d4e958c4a4e8389fc6174a09 Mon Sep 17 00:00:00 2001 From: Henry Date: Fri, 19 Jan 2024 00:02:31 +0000 Subject: [PATCH] add telemetry --- packages/server/package.json | 1 + packages/server/src/index.ts | 44 +++++++++++++++++++- packages/server/src/utils/index.ts | 56 ++++++++++++++++++++++++++ packages/server/src/utils/telemetry.ts | 50 +++++++++++++++++++++++ 4 files changed, 150 insertions(+), 1 deletion(-) create mode 100644 packages/server/src/utils/telemetry.ts diff --git a/packages/server/package.json b/packages/server/package.json index 79ff4961..fe27f8d6 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -60,6 +60,7 @@ "multer": "^1.4.5-lts.1", "mysql": "^2.18.1", "pg": "^8.11.1", + "posthog-node": "^3.5.0", "reflect-metadata": "^0.1.13", "sanitize-html": "^2.11.0", "socket.io": "^4.6.1", diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index 1986d207..c64333dd 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -45,7 +45,9 @@ import { getSessionChatHistory, getAllConnectedNodes, clearSessionMemory, - findMemoryNode + findMemoryNode, + getTelemetryFlowObj, + getAppVersion } from './utils' import { cloneDeep, omit, uniqWith, isEqual } from 'lodash' import { getDataSource } from './DataSource' @@ -64,6 +66,7 @@ import { sanitizeMiddleware } from './utils/XSS' import axios from 'axios' import { Client } from 'langchainhub' import { parsePrompt } from './utils/hub' +import { Telemetry } from './utils/telemetry' import { Variable } from './database/entities/Variable' export class App { @@ -71,6 +74,7 @@ export class App { nodesPool: NodesPool chatflowPool: ChatflowPool cachePool: CachePool + telemetry: Telemetry AppDataSource = getDataSource() constructor() { @@ -105,6 +109,9 @@ export class App { // Initialize cache pool this.cachePool = new CachePool() + + // Initialize telemetry + this.telemetry = new Telemetry() }) .catch((err) => { logger.error('❌ [server]: Error during Data Source initialization:', err) @@ -388,6 +395,12 @@ export class App { const chatflow = this.AppDataSource.getRepository(ChatFlow).create(newChatFlow) const results = await this.AppDataSource.getRepository(ChatFlow).save(chatflow) + await this.telemetry.sendTelemetry('chatflow_created', { + version: await getAppVersion(), + chatlowId: results.id, + flowGraph: getTelemetryFlowObj(JSON.parse(results.flowData)?.nodes, JSON.parse(results.flowData)?.edges) + }) + return res.json(results) }) @@ -674,6 +687,12 @@ export class App { const tool = this.AppDataSource.getRepository(Tool).create(newTool) const results = await this.AppDataSource.getRepository(Tool).save(tool) + await this.telemetry.sendTelemetry('tool_created', { + version: await getAppVersion(), + toolId: results.id, + toolName: results.name + }) + return res.json(results) }) @@ -880,6 +899,11 @@ export class App { const assistant = this.AppDataSource.getRepository(Assistant).create(newAssistant) const results = await this.AppDataSource.getRepository(Assistant).save(assistant) + await this.telemetry.sendTelemetry('assistant_created', { + version: await getAppVersion(), + assistantId: results.id + }) + return res.json(results) }) @@ -1508,6 +1532,15 @@ export class App { const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.data.id)) this.chatflowPool.add(chatflowid, undefined, startingNodes, incomingInput?.overrideConfig) + + await this.telemetry.sendTelemetry('vector_upserted', { + version: await getAppVersion(), + chatlowId: chatflowid, + type: isInternal ? chatType.INTERNAL : chatType.EXTERNAL, + flowGraph: getTelemetryFlowObj(nodes, edges), + stopNodeId + }) + return res.status(201).send('Successfully Upserted') } catch (e: any) { logger.error('[server]: Error:', e) @@ -1784,6 +1817,14 @@ export class App { await this.addChatMessage(apiMessage) logger.debug(`[server]: Finished running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`) + await this.telemetry.sendTelemetry('prediction_sent', { + version: await getAppVersion(), + chatlowId: chatflowid, + chatId, + sessionId, + type: isInternal ? chatType.INTERNAL : chatType.EXTERNAL, + flowGraph: getTelemetryFlowObj(nodes, edges) + }) // Only return ChatId when its Internal OR incoming input has ChatId, to avoid confusion when calling API if (incomingInput.chatId || isInternal) result.chatId = chatId @@ -1798,6 +1839,7 @@ export class App { async stopApp() { try { const removePromises: any[] = [] + removePromises.push(this.telemetry.flush()) await Promise.all(removePromises) } catch (e) { logger.error(`❌[server]: Flowise Server shut down error: ${e}`) diff --git a/packages/server/src/utils/index.ts b/packages/server/src/utils/index.ts index 2d6acf58..a232094e 100644 --- a/packages/server/src/utils/index.ts +++ b/packages/server/src/utils/index.ts @@ -1082,3 +1082,59 @@ export const getAllValuesFromJson = (obj: any): any[] => { extractValues(obj) return values } + +/** + * Get only essential flow data items for telemetry + * @param {IReactFlowNode[]} nodes + * @param {IReactFlowEdge[]} edges + */ +export const getTelemetryFlowObj = (nodes: IReactFlowNode[], edges: IReactFlowEdge[]) => { + const nodeData = nodes.map((node) => node.id) + const edgeData = edges.map((edge) => ({ source: edge.source, target: edge.target })) + return { nodes: nodeData, edges: edgeData } +} + +/** + * Get user settings file + * TODO: move env variables to settings json file, easier configuration + */ +export const getUserSettingsFilePath = () => { + const checkPaths = [path.join(getUserHome(), '.flowise', 'settings.json')] + for (const checkPath of checkPaths) { + if (fs.existsSync(checkPath)) { + return checkPath + } + } + return '' +} + +/** + * Get app current version + */ +export const getAppVersion = async () => { + const getPackageJsonPath = (): string => { + const checkPaths = [ + path.join(__dirname, '..', 'package.json'), + path.join(__dirname, '..', '..', 'package.json'), + path.join(__dirname, '..', '..', '..', 'package.json'), + path.join(__dirname, '..', '..', '..', '..', 'package.json'), + path.join(__dirname, '..', '..', '..', '..', '..', 'package.json') + ] + for (const checkPath of checkPaths) { + if (fs.existsSync(checkPath)) { + return checkPath + } + } + return '' + } + + const packagejsonPath = getPackageJsonPath() + if (!packagejsonPath) return '' + try { + const content = await fs.promises.readFile(packagejsonPath, 'utf8') + const parsedContent = JSON.parse(content) + return parsedContent.version + } catch (error) { + return '' + } +} diff --git a/packages/server/src/utils/telemetry.ts b/packages/server/src/utils/telemetry.ts new file mode 100644 index 00000000..4254ea76 --- /dev/null +++ b/packages/server/src/utils/telemetry.ts @@ -0,0 +1,50 @@ +import { v4 as uuidv4 } from 'uuid' +import { PostHog } from 'posthog-node' +import path from 'path' +import fs from 'fs' +import { getUserHome, getUserSettingsFilePath } from '.' + +export class Telemetry { + postHog?: PostHog + + constructor() { + if (process.env.DISABLE_FLOWISE_TELEMETRY !== 'true') { + this.postHog = new PostHog('phc_jEDuFYnOnuXsws986TLWzuisbRjwFqTl9JL8tDMgqme') + } else { + this.postHog = undefined + } + } + + async id(): Promise { + try { + const settingsContent = await fs.promises.readFile(getUserSettingsFilePath(), 'utf8') + const settings = JSON.parse(settingsContent) + return settings.instanceId + } catch (error) { + const instanceId = uuidv4() + const settings = { + instanceId + } + const defaultLocation = path.join(getUserHome(), '.flowise', 'settings.json') + await fs.promises.writeFile(defaultLocation, JSON.stringify(settings, null, 2)) + return instanceId + } + } + + async sendTelemetry(event: string, properties = {}): Promise { + if (this.postHog) { + const distinctId = await this.id() + this.postHog.capture({ + event, + distinctId, + properties + }) + } + } + + async flush(): Promise { + if (this.postHog) { + await this.postHog.shutdownAsync() + } + } +}