LunaryAI automatic Thread and User tracking (#3233)

* Lunary Thread/User tracking

* Clean console logs

* Clean

* Remove commented lines

* Remove commented line
This commit is contained in:
Vincelwt
2024-09-26 12:01:33 +01:00
committed by GitHub
parent 8690c43ef5
commit 18f916a7e1
7 changed files with 32683 additions and 32568 deletions
+86 -11
View File
@@ -14,8 +14,9 @@ import { AgentAction } from '@langchain/core/agents'
import { LunaryHandler } from '@langchain/community/callbacks/handlers/lunary'
import { getCredentialData, getCredentialParam, getEnvironmentVariable } from './utils'
import { ICommonObject, INodeData, IServerSideEventStreamer } from './Interface'
import { ICommonObject, IDatabaseEntity, INodeData, IServerSideEventStreamer } from './Interface'
import { LangWatch, LangWatchSpan, LangWatchTrace, autoconvertTypedValues } from 'langwatch'
import { DataSource } from 'typeorm'
interface AgentRun extends Run {
actions: AgentAction[]
@@ -90,6 +91,7 @@ export class ConsoleCallbackHandler extends BaseTracer {
onChainStart(run: Run) {
const crumbs = this.getBreadcrumbs(run)
this.logger.verbose(`[chain/start] [${crumbs}] Entering Chain run with input: ${tryJsonStringify(run.inputs, '[inputs]')}`)
}
@@ -235,6 +237,78 @@ export class CustomChainHandler extends BaseCallbackHandler {
}
}
class ExtendedLunaryHandler extends LunaryHandler {
chatId: string
appDataSource: DataSource
databaseEntities: IDatabaseEntity
currentRunId: string | null
thread: any
constructor({ flowiseOptions, ...options }: any) {
super(options)
this.appDataSource = flowiseOptions.appDataSource
this.databaseEntities = flowiseOptions.databaseEntities
this.chatId = flowiseOptions.chatId
}
async initThread() {
const entity = await this.appDataSource.getRepository(this.databaseEntities['Lead']).findOne({
where: {
chatId: this.chatId
}
})
this.thread = lunary.openThread({
id: this.chatId,
userId: entity?.email ?? entity?.id,
userProps: {
name: entity?.name ?? undefined,
email: entity?.email ?? undefined,
phone: entity?.phone ?? undefined
}
})
}
async handleChainStart(chain: any, inputs: any, runId: string, parentRunId?: string, tags?: string[], metadata?: any): Promise<void> {
// First chain (no parent run id) is the user message
if (this.chatId && !parentRunId) {
if (!this.thread) {
await this.initThread()
}
const messageText = inputs.input
const messageId = this.thread.trackMessage({
content: messageText,
role: 'user'
})
// Track top level chain id for knowing when we got the final reply
this.currentRunId = runId
// Use the messageId as the parent of the chain for reconciliation
super.handleChainStart(chain, inputs, runId, messageId, tags, metadata)
} else {
super.handleChainStart(chain, inputs, runId, parentRunId, tags, metadata)
}
}
async handleChainEnd(outputs: ChainValues, runId: string): Promise<void> {
if (this.chatId && runId === this.currentRunId) {
const answer = outputs.output
this.thread.trackMessage({
content: answer,
role: 'assistant'
})
this.currentRunId = null
}
super.handleChainEnd(outputs, runId)
}
}
export const additionalCallbacks = async (nodeData: INodeData, options: ICommonObject) => {
try {
if (!options.analytic) return []
@@ -293,19 +367,22 @@ export const additionalCallbacks = async (nodeData: INodeData, options: ICommonO
const handler = new CallbackHandler(langFuseOptions)
callbacks.push(handler)
} else if (provider === 'lunary') {
const lunaryAppId = getCredentialParam('lunaryAppId', credentialData, nodeData)
const lunaryPublicKey = getCredentialParam('lunaryAppId', credentialData, nodeData)
const lunaryEndpoint = getCredentialParam('lunaryEndpoint', credentialData, nodeData)
let lunaryFields = {
appId: lunaryAppId,
apiUrl: lunaryEndpoint ?? 'https://app.lunary.ai'
publicKey: lunaryPublicKey,
apiUrl: lunaryEndpoint ?? 'https://api.lunary.ai',
runtime: 'flowise',
flowiseOptions: options
}
if (nodeData?.inputs?.analytics?.lunary) {
lunaryFields = { ...lunaryFields, ...nodeData?.inputs?.analytics?.lunary }
}
const handler = new LunaryHandler(lunaryFields)
const handler = new ExtendedLunaryHandler(lunaryFields)
callbacks.push(handler)
} else if (provider === 'langWatch') {
const langWatchApiKey = getCredentialParam('langWatchApiKey', credentialData, nodeData)
@@ -376,12 +453,13 @@ export class AnalyticHandler {
})
this.handlers['langFuse'] = { client: langfuse }
} else if (provider === 'lunary') {
const lunaryAppId = getCredentialParam('lunaryAppId', credentialData, this.nodeData)
const lunaryPublicKey = getCredentialParam('lunaryAppId', credentialData, this.nodeData)
const lunaryEndpoint = getCredentialParam('lunaryEndpoint', credentialData, this.nodeData)
lunary.init({
appId: lunaryAppId,
apiUrl: lunaryEndpoint
publicKey: lunaryPublicKey,
apiUrl: lunaryEndpoint,
runtime: 'flowise'
})
this.handlers['lunary'] = { client: lunary }
@@ -487,7 +565,6 @@ export class AnalyticHandler {
await monitor.trackEvent('chain', 'start', {
runId,
name,
userId: this.options.chatId,
input,
...this.nodeData?.inputs?.analytics?.lunary
})
@@ -686,7 +763,6 @@ export class AnalyticHandler {
runId,
parentRunId: chainEventId,
name,
userId: this.options.chatId,
input
})
this.handlers['lunary'].llmEvent = { [runId]: runId }
@@ -843,7 +919,6 @@ export class AnalyticHandler {
runId,
parentRunId: chainEventId,
name,
userId: this.options.chatId,
input
})
this.handlers['lunary'].toolEvent = { [runId]: runId }