* Add better logs to build chatflow functino

* Add connection logs to queue manager

* Redact credentials

* Add connection logs for redis pub-sub

* add more loggings around queue

---------

Co-authored-by: Henry <hzj94@hotmail.com>
This commit is contained in:
Ilango
2025-07-31 15:34:01 +05:30
committed by GitHub
parent 049596a7b5
commit ed27ad0c58
6 changed files with 227 additions and 40 deletions
+57 -16
View File
@@ -57,22 +57,63 @@ export abstract class BaseQueue {
} }
public createWorker(concurrency: number = WORKER_CONCURRENCY): Worker { public createWorker(concurrency: number = WORKER_CONCURRENCY): Worker {
this.worker = new Worker( logger.info(`[BaseQueue] Creating worker for queue "${this.queue.name}" with concurrency: ${concurrency}`)
this.queue.name,
async (job: Job) => { try {
const start = new Date().getTime() this.worker = new Worker(
logger.info(`Processing job ${job.id} in ${this.queue.name} at ${new Date().toISOString()}`) this.queue.name,
const result = await this.processJob(job.data) async (job: Job) => {
const end = new Date().getTime() const start = new Date().getTime()
logger.info(`Completed job ${job.id} in ${this.queue.name} at ${new Date().toISOString()} (${end - start}ms)`) logger.info(`[BaseQueue] Processing job ${job.id} in ${this.queue.name} at ${new Date().toISOString()}`)
return result try {
}, const result = await this.processJob(job.data)
{ const end = new Date().getTime()
connection: this.connection, logger.info(
concurrency `[BaseQueue] Completed job ${job.id} in ${this.queue.name} at ${new Date().toISOString()} (${end - start}ms)`
} )
) return result
return this.worker } catch (error) {
const end = new Date().getTime()
logger.error(
`[BaseQueue] Job ${job.id} failed in ${this.queue.name} at ${new Date().toISOString()} (${end - start}ms):`,
{ error }
)
throw error
}
},
{
connection: this.connection,
concurrency
}
)
// Add error listeners to the worker
this.worker.on('error', (err) => {
logger.error(`[BaseQueue] Worker error for queue "${this.queue.name}":`, { error: err })
})
this.worker.on('ready', () => {
logger.info(`[BaseQueue] Worker ready for queue "${this.queue.name}"`)
})
this.worker.on('closing', () => {
logger.info(`[BaseQueue] Worker closing for queue "${this.queue.name}"`)
})
this.worker.on('closed', () => {
logger.info(`[BaseQueue] Worker closed for queue "${this.queue.name}"`)
})
this.worker.on('failed', (job, err) => {
logger.error(`[BaseQueue] Worker job ${job?.id} failed in queue "${this.queue.name}":`, { error: err })
})
logger.info(`[BaseQueue] Worker created successfully for queue "${this.queue.name}"`)
return this.worker
} catch (error) {
logger.error(`[BaseQueue] Failed to create worker for queue "${this.queue.name}":`, { error })
throw error
}
} }
public async getJobs(): Promise<Job[]> { public async getJobs(): Promise<Job[]> {
@@ -100,6 +100,13 @@ export class PredictionQueue extends BaseQueue {
data.signal = signal data.signal = signal
} }
if (this.redisPublisher) {
logger.info(
`[PredictionQueue] RedisPublisher is connected [orgId:${data.orgId}/flowId:${data.chatflow.id}/chatId:${data.chatId}]`,
this.redisPublisher.isConnected()
)
}
return await executeFlow(data) return await executeFlow(data)
} }
} }
+69 -22
View File
@@ -12,6 +12,7 @@ import { BullMQAdapter } from '@bull-board/api/bullMQAdapter'
import { Express } from 'express' import { Express } from 'express'
import { UsageCacheManager } from '../UsageCacheManager' import { UsageCacheManager } from '../UsageCacheManager'
import { ExpressAdapter } from '@bull-board/express' import { ExpressAdapter } from '@bull-board/express'
import logger from '../utils/logger'
const QUEUE_NAME = process.env.QUEUE_NAME || 'flowise-queue' const QUEUE_NAME = process.env.QUEUE_NAME || 'flowise-queue'
@@ -25,30 +26,57 @@ export class QueueManager {
private predictionQueueEventsProducer?: QueueEventsProducer private predictionQueueEventsProducer?: QueueEventsProducer
private constructor() { private constructor() {
let tlsOpts = undefined if (process.env.REDIS_URL) {
if (process.env.REDIS_URL && process.env.REDIS_URL.startsWith('rediss://')) { let tlsOpts = undefined
tlsOpts = { if (process.env.REDIS_URL.startsWith('rediss://')) {
rejectUnauthorized: false tlsOpts = {
rejectUnauthorized: false
}
} else if (process.env.REDIS_TLS === 'true') {
tlsOpts = {
cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined,
key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined,
ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined
}
} }
} else if (process.env.REDIS_TLS === 'true') { this.connection = {
tlsOpts = { url: process.env.REDIS_URL,
cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined, tls: tlsOpts,
key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined, enableReadyCheck: true,
ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined keepAlive:
process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10))
? parseInt(process.env.REDIS_KEEP_ALIVE, 10)
: undefined
} }
} logger.info(
this.connection = { `[QueueManager] Connecting to Redis using URL: ${process.env.REDIS_URL.replace(/\/\/[^:]+:[^@]+@/, '//[CREDENTIALS]@')}`
url: process.env.REDIS_URL || undefined, )
host: process.env.REDIS_HOST || 'localhost', } else {
port: parseInt(process.env.REDIS_PORT || '6379'), let tlsOpts = undefined
username: process.env.REDIS_USERNAME || undefined, if (process.env.REDIS_TLS === 'true') {
password: process.env.REDIS_PASSWORD || undefined, tlsOpts = {
tls: tlsOpts, cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined,
enableReadyCheck: true, key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined,
keepAlive: ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined
process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) }
? parseInt(process.env.REDIS_KEEP_ALIVE, 10) }
: undefined this.connection = {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
username: process.env.REDIS_USERNAME || undefined,
password: process.env.REDIS_PASSWORD || undefined,
tls: tlsOpts,
enableReadyCheck: true,
keepAlive:
process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10))
? parseInt(process.env.REDIS_KEEP_ALIVE, 10)
: undefined
}
logger.info(
`[QueueManager] Connecting to Redis using host:port: ${process.env.REDIS_HOST || 'localhost'}:${
process.env.REDIS_PORT || '6379'
}`
)
} }
} }
@@ -120,6 +148,16 @@ export class QueueManager {
usageCacheManager usageCacheManager
}) })
this.registerQueue('prediction', predictionQueue) this.registerQueue('prediction', predictionQueue)
// Add connection event logging for prediction queue
if (predictionQueue.getQueue().opts.connection) {
const connInfo = predictionQueue.getQueue().opts.connection || {}
const connInfoString = JSON.stringify(connInfo)
.replace(/"username":"[^"]*"/g, '"username":"[REDACTED]"')
.replace(/"password":"[^"]*"/g, '"password":"[REDACTED]"')
logger.info(`[QueueManager] Prediction queue connected to Redis: ${connInfoString}`)
}
this.predictionQueueEventsProducer = new QueueEventsProducer(predictionQueue.getQueueName(), { this.predictionQueueEventsProducer = new QueueEventsProducer(predictionQueue.getQueueName(), {
connection: this.connection connection: this.connection
}) })
@@ -134,6 +172,15 @@ export class QueueManager {
}) })
this.registerQueue('upsert', upsertionQueue) this.registerQueue('upsert', upsertionQueue)
// Add connection event logging for upsert queue
if (upsertionQueue.getQueue().opts.connection) {
const connInfo = upsertionQueue.getQueue().opts.connection || {}
const connInfoString = JSON.stringify(connInfo)
.replace(/"username":"[^"]*"/g, '"username":"[REDACTED]"')
.replace(/"password":"[^"]*"/g, '"password":"[REDACTED]"')
logger.info(`[QueueManager] Upsert queue connected to Redis: ${connInfoString}`)
}
if (serverAdapter) { if (serverAdapter) {
createBullBoard({ createBullBoard({
queues: [new BullMQAdapter(predictionQueue.getQueue()), new BullMQAdapter(upsertionQueue.getQueue())], queues: [new BullMQAdapter(predictionQueue.getQueue()), new BullMQAdapter(upsertionQueue.getQueue())],
@@ -1,5 +1,6 @@
import { IServerSideEventStreamer } from 'flowise-components' import { IServerSideEventStreamer } from 'flowise-components'
import { createClient } from 'redis' import { createClient } from 'redis'
import logger from '../utils/logger'
export class RedisEventPublisher implements IServerSideEventStreamer { export class RedisEventPublisher implements IServerSideEventStreamer {
private redisPublisher: ReturnType<typeof createClient> private redisPublisher: ReturnType<typeof createClient>
@@ -41,10 +42,55 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
: undefined : undefined
}) })
} }
this.setupEventListeners()
}
private setupEventListeners() {
this.redisPublisher.on('connect', () => {
logger.info(`[RedisEventPublisher] Redis client connecting...`)
})
this.redisPublisher.on('ready', () => {
logger.info(`[RedisEventPublisher] Redis client ready and connected`)
})
this.redisPublisher.on('error', (err) => {
logger.error(`[RedisEventPublisher] Redis client error:`, {
error: err,
isReady: this.redisPublisher.isReady,
isOpen: this.redisPublisher.isOpen
})
})
this.redisPublisher.on('end', () => {
logger.warn(`[RedisEventPublisher] Redis client connection ended`)
})
this.redisPublisher.on('reconnecting', () => {
logger.info(`[RedisEventPublisher] Redis client reconnecting...`)
})
}
isConnected() {
return this.redisPublisher.isReady
} }
async connect() { async connect() {
logger.info(`[RedisEventPublisher] Connecting to Redis...`)
await this.redisPublisher.connect() await this.redisPublisher.connect()
// Log connection details after successful connection
const connInfo = this.redisPublisher.options?.socket
const connInfoString = JSON.stringify(connInfo)
.replace(/"username":"[^"]*"/g, '"username":"[REDACTED]"')
.replace(/"password":"[^"]*"/g, '"password":"[REDACTED]"')
logger.info(`[RedisEventPublisher] Connected to Redis: ${connInfoString}`)
// Add error event listener
this.redisPublisher.on('error', (err) => {
logger.error(`[RedisEventPublisher] Redis connection error`, { error: err })
})
} }
streamCustomEvent(chatId: string, eventType: string, data: any) { streamCustomEvent(chatId: string, eventType: string, data: any) {
@@ -1,5 +1,6 @@
import { createClient } from 'redis' import { createClient } from 'redis'
import { SSEStreamer } from '../utils/SSEStreamer' import { SSEStreamer } from '../utils/SSEStreamer'
import logger from '../utils/logger'
export class RedisEventSubscriber { export class RedisEventSubscriber {
private redisSubscriber: ReturnType<typeof createClient> private redisSubscriber: ReturnType<typeof createClient>
@@ -44,10 +45,52 @@ export class RedisEventSubscriber {
}) })
} }
this.sseStreamer = sseStreamer this.sseStreamer = sseStreamer
this.setupEventListeners()
}
private setupEventListeners() {
this.redisSubscriber.on('connect', () => {
logger.info(`[RedisEventSubscriber] Redis client connecting...`)
})
this.redisSubscriber.on('ready', () => {
logger.info(`[RedisEventSubscriber] Redis client ready and connected`)
})
this.redisSubscriber.on('error', (err) => {
logger.error(`[RedisEventSubscriber] Redis client error:`, {
error: err,
isReady: this.redisSubscriber.isReady,
isOpen: this.redisSubscriber.isOpen,
subscribedChannelsCount: this.subscribedChannels.size
})
})
this.redisSubscriber.on('end', () => {
logger.warn(`[RedisEventSubscriber] Redis client connection ended`)
})
this.redisSubscriber.on('reconnecting', () => {
logger.info(`[RedisEventSubscriber] Redis client reconnecting...`)
})
} }
async connect() { async connect() {
logger.info(`[RedisEventSubscriber] Connecting to Redis...`)
await this.redisSubscriber.connect() await this.redisSubscriber.connect()
// Log connection details after successful connection
const connInfo = this.redisSubscriber.options?.socket
const connInfoString = JSON.stringify(connInfo)
.replace(/"username":"[^"]*"/g, '"username":"[REDACTED]"')
.replace(/"password":"[^"]*"/g, '"password":"[REDACTED]"')
logger.info(`[RedisEventSubscriber] Connected to Redis: ${connInfoString}`)
// Add error event listener
this.redisSubscriber.on('error', (err) => {
logger.error(`[RedisEventSubscriber] Redis connection error`, { error: err })
})
} }
subscribe(channel: string) { subscribe(channel: string) {
+5 -2
View File
@@ -922,6 +922,8 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals
chatflow.analytic = JSON.stringify(newEval) chatflow.analytic = JSON.stringify(newEval)
} }
let organizationId = ''
try { try {
// Validate API Key if its external API request // Validate API Key if its external API request
if (!isInternal) { if (!isInternal) {
@@ -949,6 +951,7 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals
} }
const orgId = org.id const orgId = org.id
organizationId = orgId
const subscriptionId = org.subscriptionId as string const subscriptionId = org.subscriptionId as string
await checkPredictions(orgId, subscriptionId, appServer.usageCacheManager) await checkPredictions(orgId, subscriptionId, appServer.usageCacheManager)
@@ -977,7 +980,7 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals
if (process.env.MODE === MODE.QUEUE) { if (process.env.MODE === MODE.QUEUE) {
const predictionQueue = appServer.queueManager.getQueue('prediction') const predictionQueue = appServer.queueManager.getQueue('prediction')
const job = await predictionQueue.addJob(omit(executeData, OMIT_QUEUE_JOB_DATA)) const job = await predictionQueue.addJob(omit(executeData, OMIT_QUEUE_JOB_DATA))
logger.debug(`[server]: [${orgId}]: Job added to queue: ${job.id}`) logger.debug(`[server]: [${orgId}/${chatflow.id}/${chatId}]: Job added to queue: ${job.id}`)
const queueEvents = predictionQueue.getQueueEvents() const queueEvents = predictionQueue.getQueueEvents()
const result = await job.waitUntilFinished(queueEvents) const result = await job.waitUntilFinished(queueEvents)
@@ -1002,7 +1005,7 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals
return result return result
} }
} catch (e) { } catch (e) {
logger.error('[server]: Error:', e) logger.error(`[server]:${organizationId}/${chatflow.id}/${chatId} Error:`, e)
appServer.abortControllerPool.remove(`${chatflow.id}_${chatId}`) appServer.abortControllerPool.remove(`${chatflow.id}_${chatId}`)
incrementFailedMetricCounter(appServer.metricsProvider, isInternal, isAgentFlow) incrementFailedMetricCounter(appServer.metricsProvider, isInternal, isAgentFlow)
if (e instanceof InternalFlowiseError && e.statusCode === StatusCodes.UNAUTHORIZED) { if (e instanceof InternalFlowiseError && e.statusCode === StatusCodes.UNAUTHORIZED) {