feat: Add Arize & Phoenix Tracer Integration (#4046)

Added Arize Phoenix Tracer

Co-authored-by: Ilango <ilango.rajagopal@flowiseai.com>
This commit is contained in:
Ali Saleh
2025-02-24 19:41:30 +05:00
committed by GitHub
parent 542936c33f
commit 0e10952b45
13 changed files with 36241 additions and 35628 deletions
+447 -3
View File
@@ -5,8 +5,19 @@ import CallbackHandler from 'langfuse-langchain'
import lunary from 'lunary'
import { RunTree, RunTreeConfig, Client as LangsmithClient } from 'langsmith'
import { Langfuse, LangfuseTraceClient, LangfuseSpanClient, LangfuseGenerationClient } from 'langfuse'
import { LangChainInstrumentation } from '@arizeai/openinference-instrumentation-langchain'
import { Metadata } from '@grpc/grpc-js'
import opentelemetry, { Span, SpanStatusCode } from '@opentelemetry/api'
import { OTLPTraceExporter as GrpcOTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-grpc'
import { OTLPTraceExporter as ProtoOTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-proto'
import { registerInstrumentations } from '@opentelemetry/instrumentation'
import { Resource } from '@opentelemetry/resources'
import { SimpleSpanProcessor, Tracer } from '@opentelemetry/sdk-trace-base'
import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node'
import { ATTR_SERVICE_NAME, ATTR_SERVICE_VERSION } from '@opentelemetry/semantic-conventions'
import { BaseCallbackHandler, NewTokenIndices, HandleLLMNewTokenCallbackFields } from '@langchain/core/callbacks/base'
import * as CallbackManagerModule from '@langchain/core/callbacks/manager'
import { LangChainTracer, LangChainTracerFields } from '@langchain/core/tracers/tracer_langchain'
import { BaseTracer, Run } from '@langchain/core/tracers/base'
import { ChainValues } from '@langchain/core/utils/types'
@@ -24,6 +35,91 @@ interface AgentRun extends Run {
actions: AgentAction[]
}
interface ArizeTracerOptions {
apiKey: string
spaceId: string
baseUrl: string
projectName: string
sdkIntegration?: string
sessionId?: string
enableCallback?: boolean
}
function getArizeTracer(options: ArizeTracerOptions): Tracer | undefined {
const SEMRESATTRS_PROJECT_NAME = 'openinference.project.name'
try {
const metadata = new Metadata()
metadata.set('api_key', options.apiKey)
metadata.set('space_id', options.spaceId)
const traceExporter = new GrpcOTLPTraceExporter({
url: `${options.baseUrl}/v1`,
metadata
})
const tracerProvider = new NodeTracerProvider({
resource: new Resource({
[ATTR_SERVICE_NAME]: options.projectName,
[ATTR_SERVICE_VERSION]: '1.0.0',
[SEMRESATTRS_PROJECT_NAME]: options.projectName,
model_id: options.projectName
})
})
tracerProvider.addSpanProcessor(new SimpleSpanProcessor(traceExporter))
if (options.enableCallback) {
registerInstrumentations({
instrumentations: []
})
const lcInstrumentation = new LangChainInstrumentation()
lcInstrumentation.manuallyInstrument(CallbackManagerModule)
tracerProvider.register()
}
return tracerProvider.getTracer(`arize-tracer-${uuidv4().toString()}`)
} catch (err) {
if (process.env.DEBUG === 'true') console.error(`Error setting up Arize tracer: ${err.message}`)
return undefined
}
}
interface PhoenixTracerOptions {
apiKey: string
baseUrl: string
projectName: string
sdkIntegration?: string
sessionId?: string
enableCallback?: boolean
}
function getPhoenixTracer(options: PhoenixTracerOptions): Tracer | undefined {
const SEMRESATTRS_PROJECT_NAME = 'openinference.project.name'
try {
const traceExporter = new ProtoOTLPTraceExporter({
url: `${options.baseUrl}/v1/traces`,
headers: {
api_key: options.apiKey
}
})
const tracerProvider = new NodeTracerProvider({
resource: new Resource({
[ATTR_SERVICE_NAME]: options.projectName,
[ATTR_SERVICE_VERSION]: '1.0.0',
[SEMRESATTRS_PROJECT_NAME]: options.projectName
})
})
tracerProvider.addSpanProcessor(new SimpleSpanProcessor(traceExporter))
if (options.enableCallback) {
registerInstrumentations({
instrumentations: []
})
const lcInstrumentation = new LangChainInstrumentation()
lcInstrumentation.manuallyInstrument(CallbackManagerModule)
tracerProvider.register()
}
return tracerProvider.getTracer(`phoenix-tracer-${uuidv4().toString()}`)
} catch (err) {
if (process.env.DEBUG === 'true') console.error(`Error setting up Phoenix tracer: ${err.message}`)
return undefined
}
}
function tryGetJsonSpaces() {
try {
return parseInt(getEnvironmentVariable('LOG_JSON_SPACES') ?? '2')
@@ -420,6 +516,48 @@ export const additionalCallbacks = async (nodeData: INodeData, options: ICommonO
const trace = langwatch.getTrace()
callbacks.push(trace.getLangChainCallback())
} else if (provider === 'arize') {
const arizeApiKey = getCredentialParam('arizeApiKey', credentialData, nodeData)
const arizeSpaceId = getCredentialParam('arizeSpaceId', credentialData, nodeData)
const arizeEndpoint = getCredentialParam('arizeEndpoint', credentialData, nodeData)
const arizeProject = analytic[provider].projectName as string
let arizeOptions: ArizeTracerOptions = {
apiKey: arizeApiKey,
spaceId: arizeSpaceId,
baseUrl: arizeEndpoint ?? 'https://otlp.arize.com',
projectName: arizeProject ?? 'default',
sdkIntegration: 'Flowise',
enableCallback: true
}
if (options.chatId) arizeOptions.sessionId = options.chatId
if (nodeData?.inputs?.analytics?.arize) {
arizeOptions = { ...arizeOptions, ...nodeData?.inputs?.analytics?.arize }
}
const tracer: Tracer | undefined = getArizeTracer(arizeOptions)
callbacks.push(tracer)
} else if (provider === 'phoenix') {
const phoenixApiKey = getCredentialParam('phoenixApiKey', credentialData, nodeData)
const phoenixEndpoint = getCredentialParam('phoenixEndpoint', credentialData, nodeData)
const phoenixProject = analytic[provider].projectName as string
let phoenixOptions: PhoenixTracerOptions = {
apiKey: phoenixApiKey,
baseUrl: phoenixEndpoint ?? 'https://app.phoenix.arize.com',
projectName: phoenixProject ?? 'default',
sdkIntegration: 'Flowise',
enableCallback: true
}
if (options.chatId) phoenixOptions.sessionId = options.chatId
if (nodeData?.inputs?.analytics?.phoenix) {
phoenixOptions = { ...phoenixOptions, ...nodeData?.inputs?.analytics?.phoenix }
}
const tracer: Tracer | undefined = getPhoenixTracer(phoenixOptions)
callbacks.push(tracer)
}
}
}
@@ -498,6 +636,42 @@ export class AnalyticHandler {
})
this.handlers['langWatch'] = { client: langwatch }
} else if (provider === 'arize') {
const arizeApiKey = getCredentialParam('arizeApiKey', credentialData, this.nodeData)
const arizeSpaceId = getCredentialParam('arizeSpaceId', credentialData, this.nodeData)
const arizeEndpoint = getCredentialParam('arizeEndpoint', credentialData, this.nodeData)
const arizeProject = analytic[provider].projectName as string
let arizeOptions: ArizeTracerOptions = {
apiKey: arizeApiKey,
spaceId: arizeSpaceId,
baseUrl: arizeEndpoint ?? 'https://otlp.arize.com',
projectName: arizeProject ?? 'default',
sdkIntegration: 'Flowise',
enableCallback: false
}
const arize: Tracer | undefined = getArizeTracer(arizeOptions)
const rootSpan: Span | undefined = undefined
this.handlers['arize'] = { client: arize, arizeProject, rootSpan }
} else if (provider === 'phoenix') {
const phoenixApiKey = getCredentialParam('phoenixApiKey', credentialData, this.nodeData)
const phoenixEndpoint = getCredentialParam('phoenixEndpoint', credentialData, this.nodeData)
const phoenixProject = analytic[provider].projectName as string
let phoenixOptions: PhoenixTracerOptions = {
apiKey: phoenixApiKey,
baseUrl: phoenixEndpoint ?? 'https://app.phoenix.arize.com',
projectName: phoenixProject ?? 'default',
sdkIntegration: 'Flowise',
enableCallback: false
}
const phoenix: Tracer | undefined = getPhoenixTracer(phoenixOptions)
const rootSpan: Span | undefined = undefined
this.handlers['phoenix'] = { client: phoenix, phoenixProject, rootSpan }
}
}
}
@@ -511,7 +685,9 @@ export class AnalyticHandler {
langSmith: {},
langFuse: {},
lunary: {},
langWatch: {}
langWatch: {},
arize: {},
phoenix: {}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'langSmith')) {
@@ -625,6 +801,74 @@ export class AnalyticHandler {
}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'arize')) {
const tracer: Tracer | undefined = this.handlers['arize'].client
let rootSpan: Span | undefined = this.handlers['arize'].rootSpan
if (!parentIds || !Object.keys(parentIds).length) {
rootSpan = tracer ? tracer.startSpan('Flowise') : undefined
if (rootSpan) {
rootSpan.setAttribute('session.id', this.options.chatId)
rootSpan.setAttribute('openinference.span.kind', 'CHAIN')
rootSpan.setAttribute('input.value', input)
rootSpan.setAttribute('input.mime_type', 'text/plain')
rootSpan.setAttribute('output.value', '[Object]')
rootSpan.setAttribute('output.mime_type', 'text/plain')
rootSpan.setStatus({ code: SpanStatusCode.OK })
rootSpan.end()
}
this.handlers['arize'].rootSpan = rootSpan
}
const rootSpanContext = rootSpan
? opentelemetry.trace.setSpan(opentelemetry.context.active(), rootSpan as Span)
: opentelemetry.context.active()
const chainSpan = tracer?.startSpan(name, undefined, rootSpanContext)
if (chainSpan) {
chainSpan.setAttribute('openinference.span.kind', 'CHAIN')
chainSpan.setAttribute('input.value', JSON.stringify(input))
chainSpan.setAttribute('input.mime_type', 'application/json')
}
const chainSpanId: any = chainSpan?.spanContext().spanId
this.handlers['arize'].chainSpan = { [chainSpanId]: chainSpan }
returnIds['arize'].chainSpan = chainSpanId
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'phoenix')) {
const tracer: Tracer | undefined = this.handlers['phoenix'].client
let rootSpan: Span | undefined = this.handlers['phoenix'].rootSpan
if (!parentIds || !Object.keys(parentIds).length) {
rootSpan = tracer ? tracer.startSpan('Flowise') : undefined
if (rootSpan) {
rootSpan.setAttribute('session.id', this.options.chatId)
rootSpan.setAttribute('openinference.span.kind', 'CHAIN')
rootSpan.setAttribute('input.value', input)
rootSpan.setAttribute('input.mime_type', 'text/plain')
rootSpan.setAttribute('output.value', '[Object]')
rootSpan.setAttribute('output.mime_type', 'text/plain')
rootSpan.setStatus({ code: SpanStatusCode.OK })
rootSpan.end()
}
this.handlers['phoenix'].rootSpan = rootSpan
}
const rootSpanContext = rootSpan
? opentelemetry.trace.setSpan(opentelemetry.context.active(), rootSpan as Span)
: opentelemetry.context.active()
const chainSpan = tracer?.startSpan(name, undefined, rootSpanContext)
if (chainSpan) {
chainSpan.setAttribute('openinference.span.kind', 'CHAIN')
chainSpan.setAttribute('input.value', JSON.stringify(input))
chainSpan.setAttribute('input.mime_type', 'application/json')
}
const chainSpanId: any = chainSpan?.spanContext().spanId
this.handlers['phoenix'].chainSpan = { [chainSpanId]: chainSpan }
returnIds['phoenix'].chainSpan = chainSpanId
}
return returnIds
}
@@ -682,6 +926,26 @@ export class AnalyticHandler {
})
}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'arize')) {
const chainSpan: Span | undefined = this.handlers['arize'].chainSpan[returnIds['arize'].chainSpan]
if (chainSpan) {
chainSpan.setAttribute('output.value', JSON.stringify(output))
chainSpan.setAttribute('output.mime_type', 'application/json')
chainSpan.setStatus({ code: SpanStatusCode.OK })
chainSpan.end()
}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'phoenix')) {
const chainSpan: Span | undefined = this.handlers['phoenix'].chainSpan[returnIds['phoenix'].chainSpan]
if (chainSpan) {
chainSpan.setAttribute('output.value', JSON.stringify(output))
chainSpan.setAttribute('output.mime_type', 'application/json')
chainSpan.setStatus({ code: SpanStatusCode.OK })
chainSpan.end()
}
}
}
async onChainError(returnIds: ICommonObject, error: string | object, shutdown = false) {
@@ -740,6 +1004,26 @@ export class AnalyticHandler {
})
}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'arize')) {
const chainSpan: Span | undefined = this.handlers['arize'].chainSpan[returnIds['arize'].chainSpan]
if (chainSpan) {
chainSpan.setAttribute('error.value', JSON.stringify(error))
chainSpan.setAttribute('error.mime_type', 'application/json')
chainSpan.setStatus({ code: SpanStatusCode.ERROR, message: error.toString() })
chainSpan.end()
}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'phoenix')) {
const chainSpan: Span | undefined = this.handlers['phoenix'].chainSpan[returnIds['phoenix'].chainSpan]
if (chainSpan) {
chainSpan.setAttribute('error.value', JSON.stringify(error))
chainSpan.setAttribute('error.mime_type', 'application/json')
chainSpan.setStatus({ code: SpanStatusCode.ERROR, message: error.toString() })
chainSpan.end()
}
}
}
async onLLMStart(name: string, input: string, parentIds: ICommonObject) {
@@ -747,7 +1031,9 @@ export class AnalyticHandler {
langSmith: {},
langFuse: {},
lunary: {},
langWatch: {}
langWatch: {},
arize: {},
phoenix: {}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'langSmith')) {
@@ -807,6 +1093,44 @@ export class AnalyticHandler {
}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'arize')) {
const tracer: Tracer | undefined = this.handlers['arize'].client
const rootSpan: Span | undefined = this.handlers['arize'].rootSpan
const rootSpanContext = rootSpan
? opentelemetry.trace.setSpan(opentelemetry.context.active(), rootSpan as Span)
: opentelemetry.context.active()
const llmSpan = tracer?.startSpan(name, undefined, rootSpanContext)
if (llmSpan) {
llmSpan.setAttribute('openinference.span.kind', 'LLM')
llmSpan.setAttribute('input.value', JSON.stringify(input))
llmSpan.setAttribute('input.mime_type', 'application/json')
}
const llmSpanId: any = llmSpan?.spanContext().spanId
this.handlers['arize'].llmSpan = { [llmSpanId]: llmSpan }
returnIds['arize'].llmSpan = llmSpanId
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'phoenix')) {
const tracer: Tracer | undefined = this.handlers['phoenix'].client
const rootSpan: Span | undefined = this.handlers['phoenix'].rootSpan
const rootSpanContext = rootSpan
? opentelemetry.trace.setSpan(opentelemetry.context.active(), rootSpan as Span)
: opentelemetry.context.active()
const llmSpan = tracer?.startSpan(name, undefined, rootSpanContext)
if (llmSpan) {
llmSpan.setAttribute('openinference.span.kind', 'LLM')
llmSpan.setAttribute('input.value', JSON.stringify(input))
llmSpan.setAttribute('input.mime_type', 'application/json')
}
const llmSpanId: any = llmSpan?.spanContext().spanId
this.handlers['phoenix'].llmSpan = { [llmSpanId]: llmSpan }
returnIds['phoenix'].llmSpan = llmSpanId
}
return returnIds
}
@@ -852,6 +1176,26 @@ export class AnalyticHandler {
})
}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'arize')) {
const llmSpan: Span | undefined = this.handlers['arize'].llmSpan[returnIds['arize'].llmSpan]
if (llmSpan) {
llmSpan.setAttribute('output.value', JSON.stringify(output))
llmSpan.setAttribute('output.mime_type', 'application/json')
llmSpan.setStatus({ code: SpanStatusCode.OK })
llmSpan.end()
}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'phoenix')) {
const llmSpan: Span | undefined = this.handlers['phoenix'].llmSpan[returnIds['phoenix'].llmSpan]
if (llmSpan) {
llmSpan.setAttribute('output.value', JSON.stringify(output))
llmSpan.setAttribute('output.mime_type', 'application/json')
llmSpan.setStatus({ code: SpanStatusCode.OK })
llmSpan.end()
}
}
}
async onLLMError(returnIds: ICommonObject, error: string | object) {
@@ -896,6 +1240,26 @@ export class AnalyticHandler {
})
}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'arize')) {
const llmSpan: Span | undefined = this.handlers['arize'].llmSpan[returnIds['arize'].llmSpan]
if (llmSpan) {
llmSpan.setAttribute('error.value', JSON.stringify(error))
llmSpan.setAttribute('error.mime_type', 'application/json')
llmSpan.setStatus({ code: SpanStatusCode.ERROR, message: error.toString() })
llmSpan.end()
}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'phoenix')) {
const llmSpan: Span | undefined = this.handlers['phoenix'].llmSpan[returnIds['phoenix'].llmSpan]
if (llmSpan) {
llmSpan.setAttribute('error.value', JSON.stringify(error))
llmSpan.setAttribute('error.mime_type', 'application/json')
llmSpan.setStatus({ code: SpanStatusCode.ERROR, message: error.toString() })
llmSpan.end()
}
}
}
async onToolStart(name: string, input: string | object, parentIds: ICommonObject) {
@@ -903,7 +1267,9 @@ export class AnalyticHandler {
langSmith: {},
langFuse: {},
lunary: {},
langWatch: {}
langWatch: {},
arize: {},
phoenix: {}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'langSmith')) {
@@ -964,6 +1330,44 @@ export class AnalyticHandler {
}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'arize')) {
const tracer: Tracer | undefined = this.handlers['arize'].client
const rootSpan: Span | undefined = this.handlers['arize'].rootSpan
const rootSpanContext = rootSpan
? opentelemetry.trace.setSpan(opentelemetry.context.active(), rootSpan as Span)
: opentelemetry.context.active()
const toolSpan = tracer?.startSpan(name, undefined, rootSpanContext)
if (toolSpan) {
toolSpan.setAttribute('openinference.span.kind', 'TOOL')
toolSpan.setAttribute('input.value', JSON.stringify(input))
toolSpan.setAttribute('input.mime_type', 'application/json')
}
const toolSpanId: any = toolSpan?.spanContext().spanId
this.handlers['arize'].toolSpan = { [toolSpanId]: toolSpan }
returnIds['arize'].toolSpan = toolSpanId
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'phoenix')) {
const tracer: Tracer | undefined = this.handlers['phoenix'].client
const rootSpan: Span | undefined = this.handlers['phoenix'].rootSpan
const rootSpanContext = rootSpan
? opentelemetry.trace.setSpan(opentelemetry.context.active(), rootSpan as Span)
: opentelemetry.context.active()
const toolSpan = tracer?.startSpan(name, undefined, rootSpanContext)
if (toolSpan) {
toolSpan.setAttribute('openinference.span.kind', 'TOOL')
toolSpan.setAttribute('input.value', JSON.stringify(input))
toolSpan.setAttribute('input.mime_type', 'application/json')
}
const toolSpanId: any = toolSpan?.spanContext().spanId
this.handlers['phoenix'].toolSpan = { [toolSpanId]: toolSpan }
returnIds['phoenix'].toolSpan = toolSpanId
}
return returnIds
}
@@ -1009,6 +1413,26 @@ export class AnalyticHandler {
})
}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'arize')) {
const toolSpan: Span | undefined = this.handlers['arize'].toolSpan[returnIds['arize'].toolSpan]
if (toolSpan) {
toolSpan.setAttribute('output.value', JSON.stringify(output))
toolSpan.setAttribute('output.mime_type', 'application/json')
toolSpan.setStatus({ code: SpanStatusCode.OK })
toolSpan.end()
}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'phoenix')) {
const toolSpan: Span | undefined = this.handlers['phoenix'].toolSpan[returnIds['phoenix'].toolSpan]
if (toolSpan) {
toolSpan.setAttribute('output.value', JSON.stringify(output))
toolSpan.setAttribute('output.mime_type', 'application/json')
toolSpan.setStatus({ code: SpanStatusCode.OK })
toolSpan.end()
}
}
}
async onToolError(returnIds: ICommonObject, error: string | object) {
@@ -1053,5 +1477,25 @@ export class AnalyticHandler {
})
}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'arize')) {
const toolSpan: Span | undefined = this.handlers['arize'].toolSpan[returnIds['arize'].toolSpan]
if (toolSpan) {
toolSpan.setAttribute('error.value', JSON.stringify(error))
toolSpan.setAttribute('error.mime_type', 'application/json')
toolSpan.setStatus({ code: SpanStatusCode.ERROR, message: error.toString() })
toolSpan.end()
}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'phoenix')) {
const toolSpan: Span | undefined = this.handlers['phoenix'].toolSpan[returnIds['phoenix'].toolSpan]
if (toolSpan) {
toolSpan.setAttribute('error.value', JSON.stringify(error))
toolSpan.setAttribute('error.mime_type', 'application/json')
toolSpan.setStatus({ code: SpanStatusCode.ERROR, message: error.toString() })
toolSpan.end()
}
}
}
}