Feature/Add bullmq redis for message queue processing (#3568)

* add bullmq redis for message queue processing

* Update pnpm-lock.yaml

* update queue manager

* remove singleton patterns, add redis to cache pool

* add bull board ui

* update rate limit handler

* update redis configuration

* Merge add rate limit redis prefix

* update rate limit queue events

* update preview loader to queue

* refractor namings to constants

* update env variable for queue

* update worker shutdown gracefully
This commit is contained in:
Henry Heng
2025-01-23 14:08:02 +00:00
committed by GitHub
parent 14adb936f2
commit a2a475ba7a
59 changed files with 38958 additions and 36985 deletions
+81
View File
@@ -0,0 +1,81 @@
import { Queue, Worker, Job, QueueEvents, RedisOptions } from 'bullmq'
import { v4 as uuidv4 } from 'uuid'
import logger from '../utils/logger'
const QUEUE_REDIS_EVENT_STREAM_MAX_LEN = process.env.QUEUE_REDIS_EVENT_STREAM_MAX_LEN
? parseInt(process.env.QUEUE_REDIS_EVENT_STREAM_MAX_LEN)
: 10000
const WORKER_CONCURRENCY = process.env.WORKER_CONCURRENCY ? parseInt(process.env.WORKER_CONCURRENCY) : 100000
export abstract class BaseQueue {
protected queue: Queue
protected queueEvents: QueueEvents
protected connection: RedisOptions
private worker: Worker
constructor(queueName: string, connection: RedisOptions) {
this.connection = connection
this.queue = new Queue(queueName, {
connection: this.connection,
streams: { events: { maxLen: QUEUE_REDIS_EVENT_STREAM_MAX_LEN } }
})
this.queueEvents = new QueueEvents(queueName, { connection: this.connection })
}
abstract processJob(data: any): Promise<any>
abstract getQueueName(): string
abstract getQueue(): Queue
public getWorker(): Worker {
return this.worker
}
public async addJob(jobData: any): Promise<Job> {
const jobId = jobData.id || uuidv4()
return await this.queue.add(jobId, jobData, { removeOnFail: true })
}
public createWorker(concurrency: number = WORKER_CONCURRENCY): Worker {
this.worker = new Worker(
this.queue.name,
async (job: Job) => {
const start = new Date().getTime()
logger.info(`Processing job ${job.id} in ${this.queue.name} at ${new Date().toISOString()}`)
const result = await this.processJob(job.data)
const end = new Date().getTime()
logger.info(`Completed job ${job.id} in ${this.queue.name} at ${new Date().toISOString()} (${end - start}ms)`)
return result
},
{
connection: this.connection,
concurrency
}
)
return this.worker
}
public async getJobs(): Promise<Job[]> {
return await this.queue.getJobs()
}
public async getJobCounts(): Promise<{ [index: string]: number }> {
return await this.queue.getJobCounts()
}
public async getJobByName(jobName: string): Promise<Job> {
const jobs = await this.queue.getJobs()
const job = jobs.find((job) => job.name === jobName)
if (!job) throw new Error(`Job name ${jobName} not found`)
return job
}
public getQueueEvents(): QueueEvents {
return this.queueEvents
}
public async clearQueue(): Promise<void> {
await this.queue.obliterate({ force: true })
}
}
@@ -0,0 +1,64 @@
import { DataSource } from 'typeorm'
import { executeFlow } from '../utils/buildChatflow'
import { IComponentNodes, IExecuteFlowParams } from '../Interface'
import { Telemetry } from '../utils/telemetry'
import { CachePool } from '../CachePool'
import { RedisEventPublisher } from './RedisEventPublisher'
import { AbortControllerPool } from '../AbortControllerPool'
import { BaseQueue } from './BaseQueue'
import { RedisOptions } from 'bullmq'
interface PredictionQueueOptions {
appDataSource: DataSource
telemetry: Telemetry
cachePool: CachePool
componentNodes: IComponentNodes
abortControllerPool: AbortControllerPool
}
export class PredictionQueue extends BaseQueue {
private componentNodes: IComponentNodes
private telemetry: Telemetry
private cachePool: CachePool
private appDataSource: DataSource
private abortControllerPool: AbortControllerPool
private redisPublisher: RedisEventPublisher
private queueName: string
constructor(name: string, connection: RedisOptions, options: PredictionQueueOptions) {
super(name, connection)
this.queueName = name
this.componentNodes = options.componentNodes || {}
this.telemetry = options.telemetry
this.cachePool = options.cachePool
this.appDataSource = options.appDataSource
this.abortControllerPool = options.abortControllerPool
this.redisPublisher = new RedisEventPublisher()
this.redisPublisher.connect()
}
public getQueueName() {
return this.queueName
}
public getQueue() {
return this.queue
}
async processJob(data: IExecuteFlowParams) {
if (this.appDataSource) data.appDataSource = this.appDataSource
if (this.telemetry) data.telemetry = this.telemetry
if (this.cachePool) data.cachePool = this.cachePool
if (this.componentNodes) data.componentNodes = this.componentNodes
if (this.redisPublisher) data.sseStreamer = this.redisPublisher
if (this.abortControllerPool) {
const abortControllerId = `${data.chatflow.id}_${data.chatId}`
const signal = new AbortController()
this.abortControllerPool.add(abortControllerId, signal)
data.signal = signal
}
return await executeFlow(data)
}
}
+127
View File
@@ -0,0 +1,127 @@
import { BaseQueue } from './BaseQueue'
import { PredictionQueue } from './PredictionQueue'
import { UpsertQueue } from './UpsertQueue'
import { IComponentNodes } from '../Interface'
import { Telemetry } from '../utils/telemetry'
import { CachePool } from '../CachePool'
import { DataSource } from 'typeorm'
import { AbortControllerPool } from '../AbortControllerPool'
import { QueueEventsProducer, RedisOptions } from 'bullmq'
import { createBullBoard } from 'bull-board'
import { BullMQAdapter } from 'bull-board/bullMQAdapter'
import { Express } from 'express'
const QUEUE_NAME = process.env.QUEUE_NAME || 'flowise-queue'
type QUEUE_TYPE = 'prediction' | 'upsert'
export class QueueManager {
private static instance: QueueManager
private queues: Map<string, BaseQueue> = new Map()
private connection: RedisOptions
private bullBoardRouter?: Express
private predictionQueueEventsProducer?: QueueEventsProducer
private constructor() {
let tlsOpts = undefined
if (process.env.REDIS_URL && process.env.REDIS_URL.startsWith('rediss://')) {
tlsOpts = {
rejectUnauthorized: false
}
} else if (process.env.REDIS_TLS === 'true') {
tlsOpts = {
cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined,
key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined,
ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined
}
}
this.connection = {
url: process.env.REDIS_URL || undefined,
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
username: process.env.REDIS_USERNAME || undefined,
password: process.env.REDIS_PASSWORD || undefined,
tls: tlsOpts
}
}
public static getInstance(): QueueManager {
if (!QueueManager.instance) {
QueueManager.instance = new QueueManager()
}
return QueueManager.instance
}
public registerQueue(name: string, queue: BaseQueue) {
this.queues.set(name, queue)
}
public getConnection() {
return this.connection
}
public getQueue(name: QUEUE_TYPE): BaseQueue {
const queue = this.queues.get(name)
if (!queue) throw new Error(`Queue ${name} not found`)
return queue
}
public getPredictionQueueEventsProducer(): QueueEventsProducer {
if (!this.predictionQueueEventsProducer) throw new Error('Prediction queue events producer not found')
return this.predictionQueueEventsProducer
}
public getBullBoardRouter(): Express {
if (!this.bullBoardRouter) throw new Error('BullBoard router not found')
return this.bullBoardRouter
}
public async getAllJobCounts(): Promise<{ [queueName: string]: { [status: string]: number } }> {
const counts: { [queueName: string]: { [status: string]: number } } = {}
for (const [name, queue] of this.queues) {
counts[name] = await queue.getJobCounts()
}
return counts
}
public setupAllQueues({
componentNodes,
telemetry,
cachePool,
appDataSource,
abortControllerPool
}: {
componentNodes: IComponentNodes
telemetry: Telemetry
cachePool: CachePool
appDataSource: DataSource
abortControllerPool: AbortControllerPool
}) {
const predictionQueueName = `${QUEUE_NAME}-prediction`
const predictionQueue = new PredictionQueue(predictionQueueName, this.connection, {
componentNodes,
telemetry,
cachePool,
appDataSource,
abortControllerPool
})
this.registerQueue('prediction', predictionQueue)
this.predictionQueueEventsProducer = new QueueEventsProducer(predictionQueue.getQueueName(), {
connection: this.connection
})
const upsertionQueueName = `${QUEUE_NAME}-upsertion`
const upsertionQueue = new UpsertQueue(upsertionQueueName, this.connection, {
componentNodes,
telemetry,
cachePool,
appDataSource
})
this.registerQueue('upsert', upsertionQueue)
const bullboard = createBullBoard([new BullMQAdapter(predictionQueue.getQueue()), new BullMQAdapter(upsertionQueue.getQueue())])
this.bullBoardRouter = bullboard.router
}
}
@@ -0,0 +1,262 @@
import { IServerSideEventStreamer } from 'flowise-components'
import { createClient } from 'redis'
export class RedisEventPublisher implements IServerSideEventStreamer {
private redisPublisher: ReturnType<typeof createClient>
constructor() {
if (process.env.REDIS_URL) {
this.redisPublisher = createClient({
url: process.env.REDIS_URL
})
} else {
this.redisPublisher = createClient({
username: process.env.REDIS_USERNAME || undefined,
password: process.env.REDIS_PASSWORD || undefined,
socket: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
tls: process.env.REDIS_TLS === 'true',
cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined,
key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined,
ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined
}
})
}
}
async connect() {
await this.redisPublisher.connect()
}
streamCustomEvent(chatId: string, eventType: string, data: any) {
try {
this.redisPublisher.publish(
chatId,
JSON.stringify({
chatId,
eventType,
data
})
)
} catch (error) {
console.error('Error streaming custom event:', error)
}
}
streamStartEvent(chatId: string, data: string) {
try {
this.redisPublisher.publish(
chatId,
JSON.stringify({
chatId,
eventType: 'start',
data
})
)
} catch (error) {
console.error('Error streaming start event:', error)
}
}
streamTokenEvent(chatId: string, data: string) {
try {
this.redisPublisher.publish(
chatId,
JSON.stringify({
chatId,
eventType: 'token',
data
})
)
} catch (error) {
console.error('Error streaming token event:', error)
}
}
streamSourceDocumentsEvent(chatId: string, data: any) {
try {
this.redisPublisher.publish(
chatId,
JSON.stringify({
chatId,
eventType: 'sourceDocuments',
data
})
)
} catch (error) {
console.error('Error streaming sourceDocuments event:', error)
}
}
streamArtifactsEvent(chatId: string, data: any) {
try {
this.redisPublisher.publish(
chatId,
JSON.stringify({
chatId,
eventType: 'artifacts',
data
})
)
} catch (error) {
console.error('Error streaming artifacts event:', error)
}
}
streamUsedToolsEvent(chatId: string, data: any) {
try {
this.redisPublisher.publish(
chatId,
JSON.stringify({
chatId,
eventType: 'usedTools',
data
})
)
} catch (error) {
console.error('Error streaming usedTools event:', error)
}
}
streamFileAnnotationsEvent(chatId: string, data: any) {
try {
this.redisPublisher.publish(
chatId,
JSON.stringify({
chatId,
eventType: 'fileAnnotations',
data
})
)
} catch (error) {
console.error('Error streaming fileAnnotations event:', error)
}
}
streamToolEvent(chatId: string, data: any): void {
try {
this.redisPublisher.publish(
chatId,
JSON.stringify({
chatId,
eventType: 'tool',
data
})
)
} catch (error) {
console.error('Error streaming tool event:', error)
}
}
streamAgentReasoningEvent(chatId: string, data: any): void {
try {
this.redisPublisher.publish(
chatId,
JSON.stringify({
chatId,
eventType: 'agentReasoning',
data
})
)
} catch (error) {
console.error('Error streaming agentReasoning event:', error)
}
}
streamNextAgentEvent(chatId: string, data: any): void {
try {
this.redisPublisher.publish(
chatId,
JSON.stringify({
chatId,
eventType: 'nextAgent',
data
})
)
} catch (error) {
console.error('Error streaming nextAgent event:', error)
}
}
streamActionEvent(chatId: string, data: any): void {
try {
this.redisPublisher.publish(
chatId,
JSON.stringify({
chatId,
eventType: 'action',
data
})
)
} catch (error) {
console.error('Error streaming action event:', error)
}
}
streamAbortEvent(chatId: string): void {
try {
this.redisPublisher.publish(
chatId,
JSON.stringify({
chatId,
eventType: 'abort',
data: '[DONE]'
})
)
} catch (error) {
console.error('Error streaming abort event:', error)
}
}
streamEndEvent(_: string) {
// placeholder for future use
}
streamErrorEvent(chatId: string, msg: string) {
try {
this.redisPublisher.publish(
chatId,
JSON.stringify({
chatId,
eventType: 'error',
data: msg
})
)
} catch (error) {
console.error('Error streaming error event:', error)
}
}
streamMetadataEvent(chatId: string, apiResponse: any) {
try {
const metadataJson: any = {}
if (apiResponse.chatId) {
metadataJson['chatId'] = apiResponse.chatId
}
if (apiResponse.chatMessageId) {
metadataJson['chatMessageId'] = apiResponse.chatMessageId
}
if (apiResponse.question) {
metadataJson['question'] = apiResponse.question
}
if (apiResponse.sessionId) {
metadataJson['sessionId'] = apiResponse.sessionId
}
if (apiResponse.memoryType) {
metadataJson['memoryType'] = apiResponse.memoryType
}
if (Object.keys(metadataJson).length > 0) {
this.streamCustomEvent(chatId, 'metadata', metadataJson)
}
} catch (error) {
console.error('Error streaming metadata event:', error)
}
}
async disconnect() {
if (this.redisPublisher) {
await this.redisPublisher.quit()
}
}
}
@@ -0,0 +1,108 @@
import { createClient } from 'redis'
import { SSEStreamer } from '../utils/SSEStreamer'
export class RedisEventSubscriber {
private redisSubscriber: ReturnType<typeof createClient>
private sseStreamer: SSEStreamer
private subscribedChannels: Set<string> = new Set()
constructor(sseStreamer: SSEStreamer) {
if (process.env.REDIS_URL) {
this.redisSubscriber = createClient({
url: process.env.REDIS_URL
})
} else {
this.redisSubscriber = createClient({
username: process.env.REDIS_USERNAME || undefined,
password: process.env.REDIS_PASSWORD || undefined,
socket: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
tls: process.env.REDIS_TLS === 'true',
cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined,
key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined,
ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined
}
})
}
this.sseStreamer = sseStreamer
}
async connect() {
await this.redisSubscriber.connect()
}
subscribe(channel: string) {
// Subscribe to the Redis channel for job events
if (!this.redisSubscriber) {
throw new Error('Redis subscriber not connected.')
}
// Check if already subscribed
if (this.subscribedChannels.has(channel)) {
return // Prevent duplicate subscription
}
this.redisSubscriber.subscribe(channel, (message) => {
this.handleEvent(message)
})
// Mark the channel as subscribed
this.subscribedChannels.add(channel)
}
private handleEvent(message: string) {
// Parse the message from Redis
const event = JSON.parse(message)
const { eventType, chatId, data } = event
// Stream the event to the client
switch (eventType) {
case 'start':
this.sseStreamer.streamStartEvent(chatId, data)
break
case 'token':
this.sseStreamer.streamTokenEvent(chatId, data)
break
case 'sourceDocuments':
this.sseStreamer.streamSourceDocumentsEvent(chatId, data)
break
case 'artifacts':
this.sseStreamer.streamArtifactsEvent(chatId, data)
break
case 'usedTools':
this.sseStreamer.streamUsedToolsEvent(chatId, data)
break
case 'fileAnnotations':
this.sseStreamer.streamFileAnnotationsEvent(chatId, data)
break
case 'tool':
this.sseStreamer.streamToolEvent(chatId, data)
break
case 'agentReasoning':
this.sseStreamer.streamAgentReasoningEvent(chatId, data)
break
case 'nextAgent':
this.sseStreamer.streamNextAgentEvent(chatId, data)
break
case 'action':
this.sseStreamer.streamActionEvent(chatId, data)
break
case 'abort':
this.sseStreamer.streamAbortEvent(chatId)
break
case 'error':
this.sseStreamer.streamErrorEvent(chatId, data)
break
case 'metadata':
this.sseStreamer.streamMetadataEvent(chatId, data)
break
}
}
async disconnect() {
if (this.redisSubscriber) {
await this.redisSubscriber.quit()
}
}
}
+85
View File
@@ -0,0 +1,85 @@
import { DataSource } from 'typeorm'
import {
IComponentNodes,
IExecuteDocStoreUpsert,
IExecuteFlowParams,
IExecutePreviewLoader,
IExecuteProcessLoader,
IExecuteVectorStoreInsert
} from '../Interface'
import { Telemetry } from '../utils/telemetry'
import { CachePool } from '../CachePool'
import { BaseQueue } from './BaseQueue'
import { executeUpsert } from '../utils/upsertVector'
import { executeDocStoreUpsert, insertIntoVectorStore, previewChunks, processLoader } from '../services/documentstore'
import { RedisOptions } from 'bullmq'
import logger from '../utils/logger'
interface UpsertQueueOptions {
appDataSource: DataSource
telemetry: Telemetry
cachePool: CachePool
componentNodes: IComponentNodes
}
export class UpsertQueue extends BaseQueue {
private componentNodes: IComponentNodes
private telemetry: Telemetry
private cachePool: CachePool
private appDataSource: DataSource
private queueName: string
constructor(name: string, connection: RedisOptions, options: UpsertQueueOptions) {
super(name, connection)
this.queueName = name
this.componentNodes = options.componentNodes || {}
this.telemetry = options.telemetry
this.cachePool = options.cachePool
this.appDataSource = options.appDataSource
}
public getQueueName() {
return this.queueName
}
public getQueue() {
return this.queue
}
async processJob(
data: IExecuteFlowParams | IExecuteDocStoreUpsert | IExecuteProcessLoader | IExecuteVectorStoreInsert | IExecutePreviewLoader
) {
if (this.appDataSource) data.appDataSource = this.appDataSource
if (this.telemetry) data.telemetry = this.telemetry
if (this.cachePool) data.cachePool = this.cachePool
if (this.componentNodes) data.componentNodes = this.componentNodes
// document-store/loader/preview
if (Object.prototype.hasOwnProperty.call(data, 'isPreviewOnly')) {
logger.info('Previewing loader...')
return await previewChunks(data as IExecutePreviewLoader)
}
// document-store/loader/process/:loaderId
if (Object.prototype.hasOwnProperty.call(data, 'isProcessWithoutUpsert')) {
logger.info('Processing loader...')
return await processLoader(data as IExecuteProcessLoader)
}
// document-store/vectorstore/insert/:loaderId
if (Object.prototype.hasOwnProperty.call(data, 'isVectorStoreInsert')) {
logger.info('Inserting vector store...')
return await insertIntoVectorStore(data as IExecuteVectorStoreInsert)
}
// document-store/upsert/:storeId
if (Object.prototype.hasOwnProperty.call(data, 'storeId')) {
logger.info('Upserting to vector store via document loader...')
return await executeDocStoreUpsert(data as IExecuteDocStoreUpsert)
}
// upsert-vector/:chatflowid
logger.info('Upserting to vector store via chatflow...')
return await executeUpsert(data as IExecuteFlowParams)
}
}