diff --git a/packages/server/src/queue/BaseQueue.ts b/packages/server/src/queue/BaseQueue.ts index d3bf18d2..49e63453 100644 --- a/packages/server/src/queue/BaseQueue.ts +++ b/packages/server/src/queue/BaseQueue.ts @@ -57,22 +57,63 @@ export abstract class BaseQueue { } public createWorker(concurrency: number = WORKER_CONCURRENCY): Worker { - this.worker = new Worker( - this.queue.name, - async (job: Job) => { - const start = new Date().getTime() - logger.info(`Processing job ${job.id} in ${this.queue.name} at ${new Date().toISOString()}`) - const result = await this.processJob(job.data) - const end = new Date().getTime() - logger.info(`Completed job ${job.id} in ${this.queue.name} at ${new Date().toISOString()} (${end - start}ms)`) - return result - }, - { - connection: this.connection, - concurrency - } - ) - return this.worker + logger.info(`[BaseQueue] Creating worker for queue "${this.queue.name}" with concurrency: ${concurrency}`) + + try { + this.worker = new Worker( + this.queue.name, + async (job: Job) => { + const start = new Date().getTime() + logger.info(`[BaseQueue] Processing job ${job.id} in ${this.queue.name} at ${new Date().toISOString()}`) + try { + const result = await this.processJob(job.data) + const end = new Date().getTime() + logger.info( + `[BaseQueue] Completed job ${job.id} in ${this.queue.name} at ${new Date().toISOString()} (${end - start}ms)` + ) + return result + } catch (error) { + const end = new Date().getTime() + logger.error( + `[BaseQueue] Job ${job.id} failed in ${this.queue.name} at ${new Date().toISOString()} (${end - start}ms):`, + { error } + ) + throw error + } + }, + { + connection: this.connection, + concurrency + } + ) + + // Add error listeners to the worker + this.worker.on('error', (err) => { + logger.error(`[BaseQueue] Worker error for queue "${this.queue.name}":`, { error: err }) + }) + + this.worker.on('ready', () => { + logger.info(`[BaseQueue] Worker ready for queue "${this.queue.name}"`) + }) + + this.worker.on('closing', () => { + logger.info(`[BaseQueue] Worker closing for queue "${this.queue.name}"`) + }) + + this.worker.on('closed', () => { + logger.info(`[BaseQueue] Worker closed for queue "${this.queue.name}"`) + }) + + this.worker.on('failed', (job, err) => { + logger.error(`[BaseQueue] Worker job ${job?.id} failed in queue "${this.queue.name}":`, { error: err }) + }) + + logger.info(`[BaseQueue] Worker created successfully for queue "${this.queue.name}"`) + return this.worker + } catch (error) { + logger.error(`[BaseQueue] Failed to create worker for queue "${this.queue.name}":`, { error }) + throw error + } } public async getJobs(): Promise { diff --git a/packages/server/src/queue/PredictionQueue.ts b/packages/server/src/queue/PredictionQueue.ts index 10cc125f..af0c9d8c 100644 --- a/packages/server/src/queue/PredictionQueue.ts +++ b/packages/server/src/queue/PredictionQueue.ts @@ -100,6 +100,13 @@ export class PredictionQueue extends BaseQueue { data.signal = signal } + if (this.redisPublisher) { + logger.info( + `[PredictionQueue] RedisPublisher is connected [orgId:${data.orgId}/flowId:${data.chatflow.id}/chatId:${data.chatId}]`, + this.redisPublisher.isConnected() + ) + } + return await executeFlow(data) } } diff --git a/packages/server/src/queue/QueueManager.ts b/packages/server/src/queue/QueueManager.ts index fa69363e..7a17cfd1 100644 --- a/packages/server/src/queue/QueueManager.ts +++ b/packages/server/src/queue/QueueManager.ts @@ -12,6 +12,7 @@ import { BullMQAdapter } from '@bull-board/api/bullMQAdapter' import { Express } from 'express' import { UsageCacheManager } from '../UsageCacheManager' import { ExpressAdapter } from '@bull-board/express' +import logger from '../utils/logger' const QUEUE_NAME = process.env.QUEUE_NAME || 'flowise-queue' @@ -25,30 +26,57 @@ export class QueueManager { private predictionQueueEventsProducer?: QueueEventsProducer private constructor() { - let tlsOpts = undefined - if (process.env.REDIS_URL && process.env.REDIS_URL.startsWith('rediss://')) { - tlsOpts = { - rejectUnauthorized: false + if (process.env.REDIS_URL) { + let tlsOpts = undefined + if (process.env.REDIS_URL.startsWith('rediss://')) { + tlsOpts = { + rejectUnauthorized: false + } + } else if (process.env.REDIS_TLS === 'true') { + tlsOpts = { + cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined, + key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined, + ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined + } } - } else if (process.env.REDIS_TLS === 'true') { - tlsOpts = { - cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined, - key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined, - ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined + this.connection = { + url: process.env.REDIS_URL, + tls: tlsOpts, + enableReadyCheck: true, + keepAlive: + process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) + ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) + : undefined } - } - this.connection = { - url: process.env.REDIS_URL || undefined, - host: process.env.REDIS_HOST || 'localhost', - port: parseInt(process.env.REDIS_PORT || '6379'), - username: process.env.REDIS_USERNAME || undefined, - password: process.env.REDIS_PASSWORD || undefined, - tls: tlsOpts, - enableReadyCheck: true, - keepAlive: - process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) - ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) - : undefined + logger.info( + `[QueueManager] Connecting to Redis using URL: ${process.env.REDIS_URL.replace(/\/\/[^:]+:[^@]+@/, '//[CREDENTIALS]@')}` + ) + } else { + let tlsOpts = undefined + if (process.env.REDIS_TLS === 'true') { + tlsOpts = { + cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined, + key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined, + ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined + } + } + this.connection = { + host: process.env.REDIS_HOST || 'localhost', + port: parseInt(process.env.REDIS_PORT || '6379'), + username: process.env.REDIS_USERNAME || undefined, + password: process.env.REDIS_PASSWORD || undefined, + tls: tlsOpts, + enableReadyCheck: true, + keepAlive: + process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) + ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) + : undefined + } + logger.info( + `[QueueManager] Connecting to Redis using host:port: ${process.env.REDIS_HOST || 'localhost'}:${ + process.env.REDIS_PORT || '6379' + }` + ) } } @@ -120,6 +148,16 @@ export class QueueManager { usageCacheManager }) this.registerQueue('prediction', predictionQueue) + + // Add connection event logging for prediction queue + if (predictionQueue.getQueue().opts.connection) { + const connInfo = predictionQueue.getQueue().opts.connection || {} + const connInfoString = JSON.stringify(connInfo) + .replace(/"username":"[^"]*"/g, '"username":"[REDACTED]"') + .replace(/"password":"[^"]*"/g, '"password":"[REDACTED]"') + logger.info(`[QueueManager] Prediction queue connected to Redis: ${connInfoString}`) + } + this.predictionQueueEventsProducer = new QueueEventsProducer(predictionQueue.getQueueName(), { connection: this.connection }) @@ -134,6 +172,15 @@ export class QueueManager { }) this.registerQueue('upsert', upsertionQueue) + // Add connection event logging for upsert queue + if (upsertionQueue.getQueue().opts.connection) { + const connInfo = upsertionQueue.getQueue().opts.connection || {} + const connInfoString = JSON.stringify(connInfo) + .replace(/"username":"[^"]*"/g, '"username":"[REDACTED]"') + .replace(/"password":"[^"]*"/g, '"password":"[REDACTED]"') + logger.info(`[QueueManager] Upsert queue connected to Redis: ${connInfoString}`) + } + if (serverAdapter) { createBullBoard({ queues: [new BullMQAdapter(predictionQueue.getQueue()), new BullMQAdapter(upsertionQueue.getQueue())], diff --git a/packages/server/src/queue/RedisEventPublisher.ts b/packages/server/src/queue/RedisEventPublisher.ts index c305757a..f2b60150 100644 --- a/packages/server/src/queue/RedisEventPublisher.ts +++ b/packages/server/src/queue/RedisEventPublisher.ts @@ -1,5 +1,6 @@ import { IServerSideEventStreamer } from 'flowise-components' import { createClient } from 'redis' +import logger from '../utils/logger' export class RedisEventPublisher implements IServerSideEventStreamer { private redisPublisher: ReturnType @@ -41,10 +42,55 @@ export class RedisEventPublisher implements IServerSideEventStreamer { : undefined }) } + + this.setupEventListeners() + } + + private setupEventListeners() { + this.redisPublisher.on('connect', () => { + logger.info(`[RedisEventPublisher] Redis client connecting...`) + }) + + this.redisPublisher.on('ready', () => { + logger.info(`[RedisEventPublisher] Redis client ready and connected`) + }) + + this.redisPublisher.on('error', (err) => { + logger.error(`[RedisEventPublisher] Redis client error:`, { + error: err, + isReady: this.redisPublisher.isReady, + isOpen: this.redisPublisher.isOpen + }) + }) + + this.redisPublisher.on('end', () => { + logger.warn(`[RedisEventPublisher] Redis client connection ended`) + }) + + this.redisPublisher.on('reconnecting', () => { + logger.info(`[RedisEventPublisher] Redis client reconnecting...`) + }) + } + + isConnected() { + return this.redisPublisher.isReady } async connect() { + logger.info(`[RedisEventPublisher] Connecting to Redis...`) await this.redisPublisher.connect() + + // Log connection details after successful connection + const connInfo = this.redisPublisher.options?.socket + const connInfoString = JSON.stringify(connInfo) + .replace(/"username":"[^"]*"/g, '"username":"[REDACTED]"') + .replace(/"password":"[^"]*"/g, '"password":"[REDACTED]"') + logger.info(`[RedisEventPublisher] Connected to Redis: ${connInfoString}`) + + // Add error event listener + this.redisPublisher.on('error', (err) => { + logger.error(`[RedisEventPublisher] Redis connection error`, { error: err }) + }) } streamCustomEvent(chatId: string, eventType: string, data: any) { diff --git a/packages/server/src/queue/RedisEventSubscriber.ts b/packages/server/src/queue/RedisEventSubscriber.ts index 49c4cb9e..a202bdc6 100644 --- a/packages/server/src/queue/RedisEventSubscriber.ts +++ b/packages/server/src/queue/RedisEventSubscriber.ts @@ -1,5 +1,6 @@ import { createClient } from 'redis' import { SSEStreamer } from '../utils/SSEStreamer' +import logger from '../utils/logger' export class RedisEventSubscriber { private redisSubscriber: ReturnType @@ -44,10 +45,52 @@ export class RedisEventSubscriber { }) } this.sseStreamer = sseStreamer + + this.setupEventListeners() + } + + private setupEventListeners() { + this.redisSubscriber.on('connect', () => { + logger.info(`[RedisEventSubscriber] Redis client connecting...`) + }) + + this.redisSubscriber.on('ready', () => { + logger.info(`[RedisEventSubscriber] Redis client ready and connected`) + }) + + this.redisSubscriber.on('error', (err) => { + logger.error(`[RedisEventSubscriber] Redis client error:`, { + error: err, + isReady: this.redisSubscriber.isReady, + isOpen: this.redisSubscriber.isOpen, + subscribedChannelsCount: this.subscribedChannels.size + }) + }) + + this.redisSubscriber.on('end', () => { + logger.warn(`[RedisEventSubscriber] Redis client connection ended`) + }) + + this.redisSubscriber.on('reconnecting', () => { + logger.info(`[RedisEventSubscriber] Redis client reconnecting...`) + }) } async connect() { + logger.info(`[RedisEventSubscriber] Connecting to Redis...`) await this.redisSubscriber.connect() + + // Log connection details after successful connection + const connInfo = this.redisSubscriber.options?.socket + const connInfoString = JSON.stringify(connInfo) + .replace(/"username":"[^"]*"/g, '"username":"[REDACTED]"') + .replace(/"password":"[^"]*"/g, '"password":"[REDACTED]"') + logger.info(`[RedisEventSubscriber] Connected to Redis: ${connInfoString}`) + + // Add error event listener + this.redisSubscriber.on('error', (err) => { + logger.error(`[RedisEventSubscriber] Redis connection error`, { error: err }) + }) } subscribe(channel: string) { diff --git a/packages/server/src/utils/buildChatflow.ts b/packages/server/src/utils/buildChatflow.ts index d71f4f27..be0bb6a1 100644 --- a/packages/server/src/utils/buildChatflow.ts +++ b/packages/server/src/utils/buildChatflow.ts @@ -922,6 +922,8 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals chatflow.analytic = JSON.stringify(newEval) } + let organizationId = '' + try { // Validate API Key if its external API request if (!isInternal) { @@ -949,6 +951,7 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals } const orgId = org.id + organizationId = orgId const subscriptionId = org.subscriptionId as string await checkPredictions(orgId, subscriptionId, appServer.usageCacheManager) @@ -977,7 +980,7 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals if (process.env.MODE === MODE.QUEUE) { const predictionQueue = appServer.queueManager.getQueue('prediction') const job = await predictionQueue.addJob(omit(executeData, OMIT_QUEUE_JOB_DATA)) - logger.debug(`[server]: [${orgId}]: Job added to queue: ${job.id}`) + logger.debug(`[server]: [${orgId}/${chatflow.id}/${chatId}]: Job added to queue: ${job.id}`) const queueEvents = predictionQueue.getQueueEvents() const result = await job.waitUntilFinished(queueEvents) @@ -1002,7 +1005,7 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals return result } } catch (e) { - logger.error('[server]: Error:', e) + logger.error(`[server]:${organizationId}/${chatflow.id}/${chatId} Error:`, e) appServer.abortControllerPool.remove(`${chatflow.id}_${chatId}`) incrementFailedMetricCounter(appServer.metricsProvider, isInternal, isAgentFlow) if (e instanceof InternalFlowiseError && e.statusCode === StatusCodes.UNAUTHORIZED) {