diff --git a/packages/server/src/metrics/OpenTelemetry.ts b/packages/server/src/metrics/OpenTelemetry.ts index 7686225d..a9a3d9c4 100644 --- a/packages/server/src/metrics/OpenTelemetry.ts +++ b/packages/server/src/metrics/OpenTelemetry.ts @@ -6,6 +6,9 @@ import { diag, DiagLogLevel, DiagConsoleLogger, Attributes, Counter } from '@ope import { getVersion } from 'flowise-components' import express from 'express' +// Create a static map to track created metrics and prevent duplicates +const createdMetrics = new Map() + export class OpenTelemetry implements IMetricsProvider { private app: express.Application private resource: Resource @@ -30,6 +33,9 @@ export class OpenTelemetry implements IMetricsProvider { if (process.env.METRICS_OPEN_TELEMETRY_DEBUG === 'true') { diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG) } + + // Clear metrics tracking on new instance + createdMetrics.clear() } public getName(): string { @@ -37,121 +43,215 @@ export class OpenTelemetry implements IMetricsProvider { } async initializeCounters(): Promise { - // Define the resource with the service name for trace grouping - const flowiseVersion = await getVersion() + try { + // Define the resource with the service name for trace grouping + const flowiseVersion = await getVersion() - this.resource = new Resource({ - [ATTR_SERVICE_NAME]: process.env.METRICS_SERVICE_NAME || 'FlowiseAI', - [ATTR_SERVICE_VERSION]: flowiseVersion.version // Version as a label - }) + this.resource = new Resource({ + [ATTR_SERVICE_NAME]: process.env.METRICS_SERVICE_NAME || 'FlowiseAI', + [ATTR_SERVICE_VERSION]: flowiseVersion.version // Version as a label + }) - const metricProtocol = process.env.METRICS_OPEN_TELEMETRY_PROTOCOL || 'http' // Default to 'http' - // Conditionally import the correct OTLP exporters based on protocol - let OTLPMetricExporter - if (metricProtocol === 'http') { - OTLPMetricExporter = require('@opentelemetry/exporter-metrics-otlp-http').OTLPMetricExporter - } else if (metricProtocol === 'grpc') { - OTLPMetricExporter = require('@opentelemetry/exporter-metrics-otlp-grpc').OTLPMetricExporter - } else if (metricProtocol === 'proto') { - OTLPMetricExporter = require('@opentelemetry/exporter-metrics-otlp-proto').OTLPMetricExporter - } else { - console.error('Invalid METRICS_OPEN_TELEMETRY_PROTOCOL specified. Please set it to "http", "grpc", or "proto".') - process.exit(1) // Exit if invalid protocol type is specified + const metricProtocol = process.env.METRICS_OPEN_TELEMETRY_PROTOCOL || 'http' // Default to 'http' + // Conditionally import the correct OTLP exporters based on protocol + let OTLPMetricExporter + if (metricProtocol === 'http') { + OTLPMetricExporter = require('@opentelemetry/exporter-metrics-otlp-http').OTLPMetricExporter + } else if (metricProtocol === 'grpc') { + OTLPMetricExporter = require('@opentelemetry/exporter-metrics-otlp-grpc').OTLPMetricExporter + } else if (metricProtocol === 'proto') { + OTLPMetricExporter = require('@opentelemetry/exporter-metrics-otlp-proto').OTLPMetricExporter + } else { + console.error('Invalid METRICS_OPEN_TELEMETRY_PROTOCOL specified. Please set it to "http", "grpc", or "proto".') + process.exit(1) // Exit if invalid protocol type is specified + } + + // Handle any existing metric exporter + if (this.otlpMetricExporter) { + try { + await this.otlpMetricExporter.shutdown() + } catch (error) { + // Ignore shutdown errors + } + } + + this.otlpMetricExporter = new OTLPMetricExporter({ + url: process.env.METRICS_OPEN_TELEMETRY_METRIC_ENDPOINT // OTLP endpoint for metrics + }) + + // Clean up any existing metric reader + if (this.metricReader) { + try { + await this.metricReader.shutdown() + } catch (error) { + // Ignore shutdown errors + } + } + + this.metricReader = new PeriodicExportingMetricReader({ + exporter: this.otlpMetricExporter, + exportIntervalMillis: 5000 // Export metrics every 5 seconds + }) + + // Clean up any existing meter provider + if (this.meterProvider) { + try { + await this.meterProvider.shutdown() + } catch (error) { + // Ignore shutdown errors + } + } + + this.meterProvider = new MeterProvider({ resource: this.resource, readers: [this.metricReader] }) + + const meter = this.meterProvider.getMeter('flowise-metrics') + // look at the FLOWISE_COUNTER enum in Interface.Metrics.ts and get all values + // for each counter in the enum, create a new promClient.Counter and add it to the registry + const enumEntries = Object.entries(FLOWISE_METRIC_COUNTERS) + enumEntries.forEach(([name, value]) => { + try { + // Check if we've already created this metric + if (!createdMetrics.has(value)) { + // derive proper counter name from the enum value (chatflow_created = Chatflow Created) + const properCounterName: string = name.replace(/_/g, ' ').replace(/\b\w/g, (l) => l.toUpperCase()) + this.counters.set( + value, + meter.createCounter(value, { + description: properCounterName + }) + ) + createdMetrics.set(value, true) + } + } catch (error) { + // Log error but continue with other metrics + console.error(`Error creating metric ${value}:`, error) + } + }) + + try { + // Add version gauge if not already created + if (!createdMetrics.has('flowise_version')) { + const versionGuage = meter.createGauge('flowise_version', { + description: 'Flowise version' + }) + // remove the last dot from the version string, e.g. 2.1.3 -> 2.13 (gauge needs a number - float) + const formattedVersion = flowiseVersion.version.replace(/\.(\d+)$/, '$1') + versionGuage.record(parseFloat(formattedVersion)) + createdMetrics.set('flowise_version', true) + } + } catch (error) { + console.error('Error creating version gauge:', error) + } + + try { + // HTTP requests counter + if (!createdMetrics.has('http_requests_total')) { + this.httpRequestCounter = meter.createCounter('http_requests_total', { + description: 'Counts the number of HTTP requests received' + }) + createdMetrics.set('http_requests_total', true) + } + } catch (error) { + console.error('Error creating HTTP request counter:', error) + } + + try { + // HTTP request duration histogram + if (!createdMetrics.has('http_request_duration_ms')) { + this.httpRequestDuration = meter.createHistogram('http_request_duration_ms', { + description: 'Records the duration of HTTP requests in ms' + }) + createdMetrics.set('http_request_duration_ms', true) + } + } catch (error) { + console.error('Error creating HTTP request duration histogram:', error) + } + + await this.setupMetricsEndpoint() + } catch (error) { + console.error('Error initializing OpenTelemetry metrics:', error) + // Don't throw - allow app to continue without metrics } - - this.otlpMetricExporter = new OTLPMetricExporter({ - url: process.env.METRICS_OPEN_TELEMETRY_METRIC_ENDPOINT // OTLP endpoint for metrics - }) - - this.metricReader = new PeriodicExportingMetricReader({ - exporter: this.otlpMetricExporter, - exportIntervalMillis: 5000 // Export metrics every 5 seconds - }) - this.meterProvider = new MeterProvider({ resource: this.resource, readers: [this.metricReader] }) - - const meter = this.meterProvider.getMeter('flowise-metrics') - // look at the FLOWISE_COUNTER enum in Interface.Metrics.ts and get all values - // for each counter in the enum, create a new promClient.Counter and add it to the registry - const enumEntries = Object.entries(FLOWISE_METRIC_COUNTERS) - enumEntries.forEach(([name, value]) => { - // derive proper counter name from the enum value (chatflow_created = Chatflow Created) - const properCounterName: string = name.replace(/_/g, ' ').replace(/\b\w/g, (l) => l.toUpperCase()) - this.counters.set( - value, - meter.createCounter(value, { - description: properCounterName - }) - ) - }) - - // in addition to the enum counters, add a few more custom counters - - const versionGuage = meter.createGauge('flowise_version', { - description: 'Flowise version' - }) - // remove the last dot from the version string, e.g. 2.1.3 -> 2.13 (guage needs a number - float) - const formattedVersion = flowiseVersion.version.replace(/\.(\d+)$/, '$1') - versionGuage.record(parseFloat(formattedVersion)) - - // Counter for HTTP requests with method, path, and status as labels - this.httpRequestCounter = meter.createCounter('http_requests_total', { - description: 'Counts the number of HTTP requests received' - }) - - // Histogram to measure HTTP request duration in milliseconds - this.httpRequestDuration = meter.createHistogram('http_request_duration_ms', { - description: 'Records the duration of HTTP requests in ms' - }) } // Function to record HTTP request duration private recordHttpRequestDuration(durationMs: number, method: string, path: string, status: number) { - this.httpRequestDuration.record(durationMs, { - method, - path, - status: status.toString() - }) + try { + if (this.httpRequestDuration) { + this.httpRequestDuration.record(durationMs, { + method, + path, + status: status.toString() + }) + } + } catch (error) { + // Log error but don't crash the application + console.error('Error recording HTTP request duration:', error) + } } // Function to record HTTP requests with specific labels private recordHttpRequest(method: string, path: string, status: number) { - this.httpRequestCounter.add(1, { - method, - path, - status: status.toString() - }) + try { + if (this.httpRequestCounter) { + this.httpRequestCounter.add(1, { + method, + path, + status: status.toString() + }) + } + } catch (error) { + // Log error but don't crash the application + console.error('Error recording HTTP request:', error) + } } async setupMetricsEndpoint(): Promise { - // Graceful shutdown for telemetry data flushing - process.on('SIGTERM', async () => { - await this.metricReader.shutdown() - await this.meterProvider.shutdown() - }) - - // Runs before each requests - this.app.use((req, res, next) => { - res.locals.startEpoch = Date.now() - next() - }) - - // Runs after each requests - this.app.use((req, res, next) => { - res.on('finish', async () => { - if (res.locals.startEpoch) { - const responseTimeInMs = Date.now() - res.locals.startEpoch - this.recordHttpRequest(req.method, req.path, res.statusCode) - this.recordHttpRequestDuration(responseTimeInMs, req.method, req.path, res.statusCode) + try { + // Graceful shutdown for telemetry data flushing + process.on('SIGTERM', async () => { + try { + if (this.metricReader) await this.metricReader.shutdown() + if (this.meterProvider) await this.meterProvider.shutdown() + } catch (error) { + console.error('Error during metrics shutdown:', error) } }) - next() - }) + + // Runs before each requests + this.app.use((req, res, next) => { + res.locals.startEpoch = Date.now() + next() + }) + + // Runs after each requests + this.app.use((req, res, next) => { + res.on('finish', async () => { + try { + if (res.locals.startEpoch) { + const responseTimeInMs = Date.now() - res.locals.startEpoch + this.recordHttpRequest(req.method, req.path, res.statusCode) + this.recordHttpRequestDuration(responseTimeInMs, req.method, req.path, res.statusCode) + } + } catch (error) { + console.error('Error in metrics middleware:', error) + } + }) + next() + }) + } catch (error) { + console.error('Error setting up metrics endpoint:', error) + } } async incrementCounter(counter: string, payload: any): Promise { - // Increment OpenTelemetry counter with the payload - if (this.counters.has(counter)) { - ;(this.counters.get(counter) as Counter).add(1, payload) + try { + // Increment OpenTelemetry counter with the payload + if (this.counters.has(counter)) { + ;(this.counters.get(counter) as Counter).add(1, payload) + } + } catch (error) { + console.error(`Error incrementing counter ${counter}:`, error) } } } diff --git a/packages/server/src/metrics/Prometheus.ts b/packages/server/src/metrics/Prometheus.ts index 15eaafea..56b4da3f 100644 --- a/packages/server/src/metrics/Prometheus.ts +++ b/packages/server/src/metrics/Prometheus.ts @@ -12,6 +12,9 @@ export class Prometheus implements IMetricsProvider { constructor(app: express.Application) { this.app = app + // Clear any existing default registry metrics to avoid conflicts + promClient.register.clear() + // Create a separate registry for our metrics this.register = new promClient.Registry() } @@ -27,48 +30,87 @@ export class Prometheus implements IMetricsProvider { // look at the FLOWISE_COUNTER enum in Interface.Metrics.ts and get all values // for each counter in the enum, create a new promClient.Counter and add it to the registry - this.counters = new Map>() + this.counters = new Map | promClient.Gauge | promClient.Histogram>() const enumEntries = Object.entries(FLOWISE_METRIC_COUNTERS) enumEntries.forEach(([name, value]) => { // derive proper counter name from the enum value (chatflow_created = Chatflow Created) const properCounterName: string = name.replace(/_/g, ' ').replace(/\b\w/g, (l) => l.toUpperCase()) - this.counters.set( - value, - new promClient.Counter({ - name: value, - help: `Total number of ${properCounterName}`, - labelNames: ['status'] - }) - ) + try { + this.counters.set( + value, + new promClient.Counter({ + name: value, + help: `Total number of ${properCounterName}`, + labelNames: ['status'], + registers: [this.register] // Explicitly set the registry + }) + ) + } catch (error) { + // If metric already exists, get it from the registry instead + const existingMetrics = this.register.getSingleMetric(value) + if (existingMetrics) { + this.counters.set(value, existingMetrics as promClient.Counter) + } + } }) // in addition to the enum counters, add a few more custom counters // version, http_request_duration_ms, http_requests_total - const versionGaugeCounter = new promClient.Gauge({ - name: 'flowise_version_info', - help: 'Flowise version info.', - labelNames: ['version'] - }) + try { + const versionGaugeCounter = new promClient.Gauge({ + name: 'flowise_version_info', + help: 'Flowise version info.', + labelNames: ['version'], + registers: [this.register] // Explicitly set the registry + }) - const { version } = await getVersion() - versionGaugeCounter.set({ version: 'v' + version }, 1) - this.counters.set('flowise_version', versionGaugeCounter) + const { version } = await getVersion() + versionGaugeCounter.set({ version: 'v' + version }, 1) + this.counters.set('flowise_version', versionGaugeCounter) + } catch (error) { + // If metric already exists, get it from the registry + const existingMetric = this.register.getSingleMetric('flowise_version') + if (existingMetric) { + this.counters.set('flowise_version', existingMetric as promClient.Gauge) + } + } - this.httpRequestDurationMicroseconds = new promClient.Histogram({ - name: 'http_request_duration_ms', - help: 'Duration of HTTP requests in ms', - labelNames: ['method', 'route', 'code'], - buckets: [1, 5, 15, 50, 100, 200, 300, 400, 500] // buckets for response time from 0.1ms to 500ms - }) - this.counters.set('http_request_duration_ms', this.httpRequestDurationMicroseconds) + try { + this.httpRequestDurationMicroseconds = new promClient.Histogram({ + name: 'http_request_duration_ms', + help: 'Duration of HTTP requests in ms', + labelNames: ['method', 'route', 'code'], + buckets: [1, 5, 15, 50, 100, 200, 300, 400, 500], // buckets for response time from 0.1ms to 500ms + registers: [this.register] // Explicitly set the registry + }) + this.counters.set('http_request_duration_ms', this.httpRequestDurationMicroseconds) + } catch (error) { + // If metric already exists, get it from the registry + const existingMetric = this.register.getSingleMetric('http_request_duration_ms') + if (existingMetric) { + this.httpRequestDurationMicroseconds = existingMetric as Histogram + this.counters.set('http_request_duration_ms', this.httpRequestDurationMicroseconds) + } + } - this.requestCounter = new Counter({ - name: 'http_requests_total', - help: 'Total number of HTTP requests', - labelNames: ['method', 'path', 'status'] - }) - this.counters.set('http_requests_total', this.requestCounter) + try { + this.requestCounter = new Counter({ + name: 'http_requests_total', + help: 'Total number of HTTP requests', + labelNames: ['method', 'path', 'status'], + registers: [this.register] // Explicitly set the registry + }) + this.counters.set('http_requests_total', this.requestCounter) + } catch (error) { + // If metric already exists, get it from the registry + const existingMetric = this.register.getSingleMetric('http_requests_total') + if (existingMetric) { + this.requestCounter = existingMetric as Counter + this.counters.set('http_requests_total', this.requestCounter) + } + } + // Only register metrics that aren't already in the registry this.registerMetrics() await this.setupMetricsEndpoint() } @@ -111,12 +153,28 @@ export class Prometheus implements IMetricsProvider { private registerMetrics() { if (process.env.METRICS_INCLUDE_NODE_METRICS !== 'false') { + // Clear any existing default metrics to avoid conflicts + promClient.register.clear() // enable default metrics like CPU usage, memory usage, etc. - promClient.collectDefaultMetrics({ register: this.register }) + // and ensure they're only registered with our custom registry + promClient.collectDefaultMetrics({ + register: this.register, + prefix: 'flowise_' // Add a prefix to avoid conflicts + }) } - // Add our custom metrics to the registry + + // Add only the custom metrics that haven't been registered yet for (const counter of this.counters.values()) { - this.register.registerMetric(counter) + try { + // Type assertion to access the name property + const metricName = (counter as any).name + if (!this.register.getSingleMetric(metricName)) { + this.register.registerMetric(counter) + } + } catch (error) { + // If we can't register the metric, it probably already exists + // Just continue with the next one + } } } }