Merge pull request #1563 from FlowiseAI/feature/PostHog

Feature/PostHog
This commit is contained in:
Henry Heng
2024-01-19 12:13:38 +00:00
committed by GitHub
10 changed files with 162 additions and 3 deletions
+1
View File
@@ -138,6 +138,7 @@ Flowise 支持不同的环境变量来配置您的实例。您可以在 `package
| DATABASE_NAME | 数据库名称(当 DATABASE_TYPE 不是 sqlite 时) | 字符串 | |
| SECRETKEY_PATH | 保存加密密钥(用于加密/解密凭据)的位置 | 字符串 | `your-path/Flowise/packages/server` |
| FLOWISE_SECRETKEY_OVERWRITE | 加密密钥用于替代存储在 SECRETKEY_PATH 中的密钥 | 字符串 |
| DISABLE_FLOWISE_TELEMETRY | 关闭遥测 | 字符串 |
您也可以在使用 `npx` 时指定环境变量。例如:
+1
View File
@@ -141,6 +141,7 @@ Flowise support different environment variables to configure your instance. You
| DATABASE_SSL | Database connection overssl (When DATABASE_TYPE is postgre) | Boolean | false |
| SECRETKEY_PATH | Location where encryption key (used to encrypt/decrypt credentials) is saved | String | `your-path/Flowise/packages/server` |
| FLOWISE_SECRETKEY_OVERWRITE | Encryption key to be used instead of the key stored in SECRETKEY_PATH | String |
| DISABLE_FLOWISE_TELEMETRY | Turn off telemetry | Boolean |
You can also specify the env variables when using `npx`. For example:
+3 -1
View File
@@ -25,4 +25,6 @@ LOG_PATH=/root/.flowise/logs
# LANGCHAIN_TRACING_V2=true
# LANGCHAIN_ENDPOINT=https://api.smith.langchain.com
# LANGCHAIN_API_KEY=your_api_key
# LANGCHAIN_PROJECT=your_project
# LANGCHAIN_PROJECT=your_project
# DISABLE_FLOWISE_TELEMETRY=true
+1
View File
@@ -22,6 +22,7 @@ services:
- FLOWISE_SECRETKEY_OVERWRITE=${FLOWISE_SECRETKEY_OVERWRITE}
- LOG_LEVEL=${LOG_LEVEL}
- LOG_PATH=${LOG_PATH}
- DISABLE_FLOWISE_TELEMETRY=${DISABLE_FLOWISE_TELEMETRY}
ports:
- '${PORT}:${PORT}'
volumes:
+2
View File
@@ -26,3 +26,5 @@ PORT=3000
# LANGCHAIN_ENDPOINT=https://api.smith.langchain.com
# LANGCHAIN_API_KEY=your_api_key
# LANGCHAIN_PROJECT=your_project
# DISABLE_FLOWISE_TELEMETRY=true
+1
View File
@@ -60,6 +60,7 @@
"multer": "^1.4.5-lts.1",
"mysql": "^2.18.1",
"pg": "^8.11.1",
"posthog-node": "^3.5.0",
"reflect-metadata": "^0.1.13",
"sanitize-html": "^2.11.0",
"socket.io": "^4.6.1",
+5 -1
View File
@@ -39,7 +39,8 @@ export default class Start extends Command {
LANGCHAIN_TRACING_V2: Flags.string(),
LANGCHAIN_ENDPOINT: Flags.string(),
LANGCHAIN_API_KEY: Flags.string(),
LANGCHAIN_PROJECT: Flags.string()
LANGCHAIN_PROJECT: Flags.string(),
DISABLE_FLOWISE_TELEMETRY: Flags.string()
}
async stopProcess() {
@@ -113,6 +114,9 @@ export default class Start extends Command {
if (flags.LANGCHAIN_API_KEY) process.env.LANGCHAIN_API_KEY = flags.LANGCHAIN_API_KEY
if (flags.LANGCHAIN_PROJECT) process.env.LANGCHAIN_PROJECT = flags.LANGCHAIN_PROJECT
// Telemetry
if (flags.DISABLE_FLOWISE_TELEMETRY) process.env.DISABLE_FLOWISE_TELEMETRY = flags.DISABLE_FLOWISE_TELEMETRY
await (async () => {
try {
logger.info('Starting Flowise...')
+42 -1
View File
@@ -45,7 +45,9 @@ import {
getSessionChatHistory,
getAllConnectedNodes,
clearSessionMemory,
findMemoryNode
findMemoryNode,
getTelemetryFlowObj,
getAppVersion
} from './utils'
import { cloneDeep, omit, uniqWith, isEqual } from 'lodash'
import { getDataSource } from './DataSource'
@@ -64,6 +66,7 @@ import { sanitizeMiddleware } from './utils/XSS'
import axios from 'axios'
import { Client } from 'langchainhub'
import { parsePrompt } from './utils/hub'
import { Telemetry } from './utils/telemetry'
import { Variable } from './database/entities/Variable'
export class App {
@@ -71,6 +74,7 @@ export class App {
nodesPool: NodesPool
chatflowPool: ChatflowPool
cachePool: CachePool
telemetry: Telemetry
AppDataSource = getDataSource()
constructor() {
@@ -105,6 +109,9 @@ export class App {
// Initialize cache pool
this.cachePool = new CachePool()
// Initialize telemetry
this.telemetry = new Telemetry()
})
.catch((err) => {
logger.error('❌ [server]: Error during Data Source initialization:', err)
@@ -388,6 +395,12 @@ export class App {
const chatflow = this.AppDataSource.getRepository(ChatFlow).create(newChatFlow)
const results = await this.AppDataSource.getRepository(ChatFlow).save(chatflow)
await this.telemetry.sendTelemetry('chatflow_created', {
version: await getAppVersion(),
chatlowId: results.id,
flowGraph: getTelemetryFlowObj(JSON.parse(results.flowData)?.nodes, JSON.parse(results.flowData)?.edges)
})
return res.json(results)
})
@@ -674,6 +687,12 @@ export class App {
const tool = this.AppDataSource.getRepository(Tool).create(newTool)
const results = await this.AppDataSource.getRepository(Tool).save(tool)
await this.telemetry.sendTelemetry('tool_created', {
version: await getAppVersion(),
toolId: results.id,
toolName: results.name
})
return res.json(results)
})
@@ -880,6 +899,11 @@ export class App {
const assistant = this.AppDataSource.getRepository(Assistant).create(newAssistant)
const results = await this.AppDataSource.getRepository(Assistant).save(assistant)
await this.telemetry.sendTelemetry('assistant_created', {
version: await getAppVersion(),
assistantId: results.id
})
return res.json(results)
})
@@ -1508,6 +1532,15 @@ export class App {
const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.data.id))
this.chatflowPool.add(chatflowid, undefined, startingNodes, incomingInput?.overrideConfig)
await this.telemetry.sendTelemetry('vector_upserted', {
version: await getAppVersion(),
chatlowId: chatflowid,
type: isInternal ? chatType.INTERNAL : chatType.EXTERNAL,
flowGraph: getTelemetryFlowObj(nodes, edges),
stopNodeId
})
return res.status(201).send('Successfully Upserted')
} catch (e: any) {
logger.error('[server]: Error:', e)
@@ -1784,6 +1817,13 @@ export class App {
await this.addChatMessage(apiMessage)
logger.debug(`[server]: Finished running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`)
await this.telemetry.sendTelemetry('prediction_sent', {
version: await getAppVersion(),
chatlowId: chatflowid,
chatId,
type: isInternal ? chatType.INTERNAL : chatType.EXTERNAL,
flowGraph: getTelemetryFlowObj(nodes, edges)
})
// Only return ChatId when its Internal OR incoming input has ChatId, to avoid confusion when calling API
if (incomingInput.chatId || isInternal) result.chatId = chatId
@@ -1798,6 +1838,7 @@ export class App {
async stopApp() {
try {
const removePromises: any[] = []
removePromises.push(this.telemetry.flush())
await Promise.all(removePromises)
} catch (e) {
logger.error(`❌[server]: Flowise Server shut down error: ${e}`)
+56
View File
@@ -1082,3 +1082,59 @@ export const getAllValuesFromJson = (obj: any): any[] => {
extractValues(obj)
return values
}
/**
* Get only essential flow data items for telemetry
* @param {IReactFlowNode[]} nodes
* @param {IReactFlowEdge[]} edges
*/
export const getTelemetryFlowObj = (nodes: IReactFlowNode[], edges: IReactFlowEdge[]) => {
const nodeData = nodes.map((node) => node.id)
const edgeData = edges.map((edge) => ({ source: edge.source, target: edge.target }))
return { nodes: nodeData, edges: edgeData }
}
/**
* Get user settings file
* TODO: move env variables to settings json file, easier configuration
*/
export const getUserSettingsFilePath = () => {
const checkPaths = [path.join(getUserHome(), '.flowise', 'settings.json')]
for (const checkPath of checkPaths) {
if (fs.existsSync(checkPath)) {
return checkPath
}
}
return ''
}
/**
* Get app current version
*/
export const getAppVersion = async () => {
const getPackageJsonPath = (): string => {
const checkPaths = [
path.join(__dirname, '..', 'package.json'),
path.join(__dirname, '..', '..', 'package.json'),
path.join(__dirname, '..', '..', '..', 'package.json'),
path.join(__dirname, '..', '..', '..', '..', 'package.json'),
path.join(__dirname, '..', '..', '..', '..', '..', 'package.json')
]
for (const checkPath of checkPaths) {
if (fs.existsSync(checkPath)) {
return checkPath
}
}
return ''
}
const packagejsonPath = getPackageJsonPath()
if (!packagejsonPath) return ''
try {
const content = await fs.promises.readFile(packagejsonPath, 'utf8')
const parsedContent = JSON.parse(content)
return parsedContent.version
} catch (error) {
return ''
}
}
+50
View File
@@ -0,0 +1,50 @@
import { v4 as uuidv4 } from 'uuid'
import { PostHog } from 'posthog-node'
import path from 'path'
import fs from 'fs'
import { getUserHome, getUserSettingsFilePath } from '.'
export class Telemetry {
postHog?: PostHog
constructor() {
if (process.env.DISABLE_FLOWISE_TELEMETRY !== 'true') {
this.postHog = new PostHog('phc_jEDuFYnOnuXsws986TLWzuisbRjwFqTl9JL8tDMgqme')
} else {
this.postHog = undefined
}
}
async id(): Promise<string> {
try {
const settingsContent = await fs.promises.readFile(getUserSettingsFilePath(), 'utf8')
const settings = JSON.parse(settingsContent)
return settings.instanceId
} catch (error) {
const instanceId = uuidv4()
const settings = {
instanceId
}
const defaultLocation = path.join(getUserHome(), '.flowise', 'settings.json')
await fs.promises.writeFile(defaultLocation, JSON.stringify(settings, null, 2))
return instanceId
}
}
async sendTelemetry(event: string, properties = {}): Promise<void> {
if (this.postHog) {
const distinctId = await this.id()
this.postHog.capture({
event,
distinctId,
properties
})
}
}
async flush(): Promise<void> {
if (this.postHog) {
await this.postHog.shutdownAsync()
}
}
}