add telemetry

This commit is contained in:
Henry
2024-01-19 00:02:31 +00:00
parent 4ca7bc058e
commit 953e1468bb
4 changed files with 150 additions and 1 deletions
+1
View File
@@ -60,6 +60,7 @@
"multer": "^1.4.5-lts.1",
"mysql": "^2.18.1",
"pg": "^8.11.1",
"posthog-node": "^3.5.0",
"reflect-metadata": "^0.1.13",
"sanitize-html": "^2.11.0",
"socket.io": "^4.6.1",
+43 -1
View File
@@ -45,7 +45,9 @@ import {
getSessionChatHistory,
getAllConnectedNodes,
clearSessionMemory,
findMemoryNode
findMemoryNode,
getTelemetryFlowObj,
getAppVersion
} from './utils'
import { cloneDeep, omit, uniqWith, isEqual } from 'lodash'
import { getDataSource } from './DataSource'
@@ -64,6 +66,7 @@ import { sanitizeMiddleware } from './utils/XSS'
import axios from 'axios'
import { Client } from 'langchainhub'
import { parsePrompt } from './utils/hub'
import { Telemetry } from './utils/telemetry'
import { Variable } from './database/entities/Variable'
export class App {
@@ -71,6 +74,7 @@ export class App {
nodesPool: NodesPool
chatflowPool: ChatflowPool
cachePool: CachePool
telemetry: Telemetry
AppDataSource = getDataSource()
constructor() {
@@ -105,6 +109,9 @@ export class App {
// Initialize cache pool
this.cachePool = new CachePool()
// Initialize telemetry
this.telemetry = new Telemetry()
})
.catch((err) => {
logger.error('❌ [server]: Error during Data Source initialization:', err)
@@ -388,6 +395,12 @@ export class App {
const chatflow = this.AppDataSource.getRepository(ChatFlow).create(newChatFlow)
const results = await this.AppDataSource.getRepository(ChatFlow).save(chatflow)
await this.telemetry.sendTelemetry('chatflow_created', {
version: await getAppVersion(),
chatlowId: results.id,
flowGraph: getTelemetryFlowObj(JSON.parse(results.flowData)?.nodes, JSON.parse(results.flowData)?.edges)
})
return res.json(results)
})
@@ -674,6 +687,12 @@ export class App {
const tool = this.AppDataSource.getRepository(Tool).create(newTool)
const results = await this.AppDataSource.getRepository(Tool).save(tool)
await this.telemetry.sendTelemetry('tool_created', {
version: await getAppVersion(),
toolId: results.id,
toolName: results.name
})
return res.json(results)
})
@@ -880,6 +899,11 @@ export class App {
const assistant = this.AppDataSource.getRepository(Assistant).create(newAssistant)
const results = await this.AppDataSource.getRepository(Assistant).save(assistant)
await this.telemetry.sendTelemetry('assistant_created', {
version: await getAppVersion(),
assistantId: results.id
})
return res.json(results)
})
@@ -1508,6 +1532,15 @@ export class App {
const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.data.id))
this.chatflowPool.add(chatflowid, undefined, startingNodes, incomingInput?.overrideConfig)
await this.telemetry.sendTelemetry('vector_upserted', {
version: await getAppVersion(),
chatlowId: chatflowid,
type: isInternal ? chatType.INTERNAL : chatType.EXTERNAL,
flowGraph: getTelemetryFlowObj(nodes, edges),
stopNodeId
})
return res.status(201).send('Successfully Upserted')
} catch (e: any) {
logger.error('[server]: Error:', e)
@@ -1784,6 +1817,14 @@ export class App {
await this.addChatMessage(apiMessage)
logger.debug(`[server]: Finished running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`)
await this.telemetry.sendTelemetry('prediction_sent', {
version: await getAppVersion(),
chatlowId: chatflowid,
chatId,
sessionId,
type: isInternal ? chatType.INTERNAL : chatType.EXTERNAL,
flowGraph: getTelemetryFlowObj(nodes, edges)
})
// Only return ChatId when its Internal OR incoming input has ChatId, to avoid confusion when calling API
if (incomingInput.chatId || isInternal) result.chatId = chatId
@@ -1798,6 +1839,7 @@ export class App {
async stopApp() {
try {
const removePromises: any[] = []
removePromises.push(this.telemetry.flush())
await Promise.all(removePromises)
} catch (e) {
logger.error(`❌[server]: Flowise Server shut down error: ${e}`)
+56
View File
@@ -1082,3 +1082,59 @@ export const getAllValuesFromJson = (obj: any): any[] => {
extractValues(obj)
return values
}
/**
* Get only essential flow data items for telemetry
* @param {IReactFlowNode[]} nodes
* @param {IReactFlowEdge[]} edges
*/
export const getTelemetryFlowObj = (nodes: IReactFlowNode[], edges: IReactFlowEdge[]) => {
const nodeData = nodes.map((node) => node.id)
const edgeData = edges.map((edge) => ({ source: edge.source, target: edge.target }))
return { nodes: nodeData, edges: edgeData }
}
/**
* Get user settings file
* TODO: move env variables to settings json file, easier configuration
*/
export const getUserSettingsFilePath = () => {
const checkPaths = [path.join(getUserHome(), '.flowise', 'settings.json')]
for (const checkPath of checkPaths) {
if (fs.existsSync(checkPath)) {
return checkPath
}
}
return ''
}
/**
* Get app current version
*/
export const getAppVersion = async () => {
const getPackageJsonPath = (): string => {
const checkPaths = [
path.join(__dirname, '..', 'package.json'),
path.join(__dirname, '..', '..', 'package.json'),
path.join(__dirname, '..', '..', '..', 'package.json'),
path.join(__dirname, '..', '..', '..', '..', 'package.json'),
path.join(__dirname, '..', '..', '..', '..', '..', 'package.json')
]
for (const checkPath of checkPaths) {
if (fs.existsSync(checkPath)) {
return checkPath
}
}
return ''
}
const packagejsonPath = getPackageJsonPath()
if (!packagejsonPath) return ''
try {
const content = await fs.promises.readFile(packagejsonPath, 'utf8')
const parsedContent = JSON.parse(content)
return parsedContent.version
} catch (error) {
return ''
}
}
+50
View File
@@ -0,0 +1,50 @@
import { v4 as uuidv4 } from 'uuid'
import { PostHog } from 'posthog-node'
import path from 'path'
import fs from 'fs'
import { getUserHome, getUserSettingsFilePath } from '.'
export class Telemetry {
postHog?: PostHog
constructor() {
if (process.env.DISABLE_FLOWISE_TELEMETRY !== 'true') {
this.postHog = new PostHog('phc_jEDuFYnOnuXsws986TLWzuisbRjwFqTl9JL8tDMgqme')
} else {
this.postHog = undefined
}
}
async id(): Promise<string> {
try {
const settingsContent = await fs.promises.readFile(getUserSettingsFilePath(), 'utf8')
const settings = JSON.parse(settingsContent)
return settings.instanceId
} catch (error) {
const instanceId = uuidv4()
const settings = {
instanceId
}
const defaultLocation = path.join(getUserHome(), '.flowise', 'settings.json')
await fs.promises.writeFile(defaultLocation, JSON.stringify(settings, null, 2))
return instanceId
}
}
async sendTelemetry(event: string, properties = {}): Promise<void> {
if (this.postHog) {
const distinctId = await this.id()
this.postHog.capture({
event,
distinctId,
properties
})
}
}
async flush(): Promise<void> {
if (this.postHog) {
await this.postHog.shutdownAsync()
}
}
}