mirror of
https://github.com/farcasclaudiu/Flowise.git
synced 2026-06-29 09:01:06 +03:00
init rateLimiter
This commit is contained in:
@@ -46,6 +46,7 @@
|
|||||||
"license": "SEE LICENSE IN LICENSE.md",
|
"license": "SEE LICENSE IN LICENSE.md",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@oclif/core": "^1.13.10",
|
"@oclif/core": "^1.13.10",
|
||||||
|
"async-mutex": "^0.4.0",
|
||||||
"axios": "^0.27.2",
|
"axios": "^0.27.2",
|
||||||
"cors": "^2.8.5",
|
"cors": "^2.8.5",
|
||||||
"crypto-js": "^4.1.1",
|
"crypto-js": "^4.1.1",
|
||||||
|
|||||||
@@ -15,6 +15,9 @@ export interface IChatFlow {
|
|||||||
isPublic?: boolean
|
isPublic?: boolean
|
||||||
apikeyid?: string
|
apikeyid?: string
|
||||||
chatbotConfig?: string
|
chatbotConfig?: string
|
||||||
|
rateLimit?: number
|
||||||
|
rateLimitDuration?: number
|
||||||
|
rateLimitMsg?: string
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface IChatMessage {
|
export interface IChatMessage {
|
||||||
|
|||||||
@@ -25,6 +25,15 @@ export class ChatFlow implements IChatFlow {
|
|||||||
@Column({ nullable: true })
|
@Column({ nullable: true })
|
||||||
chatbotConfig?: string
|
chatbotConfig?: string
|
||||||
|
|
||||||
|
@Column({ nullable: true })
|
||||||
|
rateLimit?: number
|
||||||
|
|
||||||
|
@Column({ nullable: true })
|
||||||
|
rateLimitDuration?: number
|
||||||
|
|
||||||
|
@Column({ nullable: true })
|
||||||
|
rateLimitMsg?: string
|
||||||
|
|
||||||
@CreateDateColumn()
|
@CreateDateColumn()
|
||||||
createdDate: Date
|
createdDate: Date
|
||||||
|
|
||||||
|
|||||||
@@ -54,7 +54,7 @@ import { Credential } from './entity/Credential'
|
|||||||
import { Tool } from './entity/Tool'
|
import { Tool } from './entity/Tool'
|
||||||
import { ChatflowPool } from './ChatflowPool'
|
import { ChatflowPool } from './ChatflowPool'
|
||||||
import { ICommonObject, INodeOptionsValue } from 'flowise-components'
|
import { ICommonObject, INodeOptionsValue } from 'flowise-components'
|
||||||
import { createRateLimiter, getRateLimiter } from './utils/rateLimit'
|
import { createRateLimiter, getRateLimiter, initializeRateLimiter } from './utils/rateLimit'
|
||||||
|
|
||||||
export class App {
|
export class App {
|
||||||
app: express.Application
|
app: express.Application
|
||||||
@@ -84,6 +84,10 @@ export class App {
|
|||||||
|
|
||||||
// Initialize encryption key
|
// Initialize encryption key
|
||||||
await getEncryptionKey()
|
await getEncryptionKey()
|
||||||
|
|
||||||
|
// Initialize Rate Limit
|
||||||
|
const AllChatFlow: IChatFlow[] = await getAllChatFlow()
|
||||||
|
await initializeRateLimiter(AllChatFlow)
|
||||||
})
|
})
|
||||||
.catch((err) => {
|
.catch((err) => {
|
||||||
logger.error('❌ [server]: Error during Data Source initialization:', err)
|
logger.error('❌ [server]: Error during Data Source initialization:', err)
|
||||||
@@ -246,7 +250,7 @@ export class App {
|
|||||||
|
|
||||||
// Get all chatflows
|
// Get all chatflows
|
||||||
this.app.get('/api/v1/chatflows', async (req: Request, res: Response) => {
|
this.app.get('/api/v1/chatflows', async (req: Request, res: Response) => {
|
||||||
const chatflows: IChatFlow[] = await this.AppDataSource.getRepository(ChatFlow).find()
|
const chatflows: IChatFlow[] = await getAllChatFlow()
|
||||||
return res.json(chatflows)
|
return res.json(chatflows)
|
||||||
})
|
})
|
||||||
|
|
||||||
@@ -655,21 +659,6 @@ export class App {
|
|||||||
// Prediction
|
// Prediction
|
||||||
// ----------------------------------------
|
// ----------------------------------------
|
||||||
|
|
||||||
this.app.get(
|
|
||||||
'/api/v1/rate-limit/:id',
|
|
||||||
upload.array('files'),
|
|
||||||
(req: Request, res: Response, next: NextFunction) => getRateLimiter(req, res, next),
|
|
||||||
// specificRouteLimiter,
|
|
||||||
async (req: Request, res: Response) => {
|
|
||||||
res.send("you're fine")
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
this.app.post('/api/v1/rate-limit/', async (req: Request, res: Response) => {
|
|
||||||
createRateLimiter(req)
|
|
||||||
res.send('Created/Updated rate limit')
|
|
||||||
})
|
|
||||||
|
|
||||||
// Send input message and get prediction result (External)
|
// Send input message and get prediction result (External)
|
||||||
this.app.post('/api/v1/prediction/:id', upload.array('files'), async (req: Request, res: Response) => {
|
this.app.post('/api/v1/prediction/:id', upload.array('files'), async (req: Request, res: Response) => {
|
||||||
await this.processPrediction(req, res, socketIO)
|
await this.processPrediction(req, res, socketIO)
|
||||||
@@ -768,6 +757,39 @@ export class App {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// ----------------------------------------
|
||||||
|
// Rate Limit
|
||||||
|
// ----------------------------------------
|
||||||
|
|
||||||
|
this.app.get(
|
||||||
|
'/api/v1/rate-limit/:id',
|
||||||
|
upload.array('files'),
|
||||||
|
(req: Request, res: Response, next: NextFunction) => getRateLimiter(req, res, next),
|
||||||
|
// specificRouteLimiter,
|
||||||
|
async (req: Request, res: Response) => {
|
||||||
|
res.send("you're fine")
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
this.app.post('/api/v1/rate-limit/', async (req: Request, res: Response) => {
|
||||||
|
const id = req.body.id
|
||||||
|
const duration = req.body.duration
|
||||||
|
const limit = req.body.limit
|
||||||
|
const message = req.body.message
|
||||||
|
|
||||||
|
const result = await getDataSource()
|
||||||
|
.getRepository(ChatFlow)
|
||||||
|
.createQueryBuilder()
|
||||||
|
.update(ChatFlow)
|
||||||
|
.set({ rateLimit: limit, rateLimitDuration: duration, rateLimitMsg: message })
|
||||||
|
.where('id = :id', { id: id })
|
||||||
|
.execute()
|
||||||
|
|
||||||
|
await createRateLimiter(id, Number(duration), Number(limit), message)
|
||||||
|
|
||||||
|
res.send({ result })
|
||||||
|
})
|
||||||
|
|
||||||
// ----------------------------------------
|
// ----------------------------------------
|
||||||
// Serve UI static
|
// Serve UI static
|
||||||
// ----------------------------------------
|
// ----------------------------------------
|
||||||
@@ -1012,6 +1034,10 @@ export async function getChatId(chatflowid: string) {
|
|||||||
|
|
||||||
let serverApp: App | undefined
|
let serverApp: App | undefined
|
||||||
|
|
||||||
|
export async function getAllChatFlow(): Promise<IChatFlow[]> {
|
||||||
|
return await getDataSource().getRepository(ChatFlow).find()
|
||||||
|
}
|
||||||
|
|
||||||
export async function start(): Promise<void> {
|
export async function start(): Promise<void> {
|
||||||
serverApp = new App()
|
serverApp = new App()
|
||||||
|
|
||||||
|
|||||||
@@ -1,56 +1,39 @@
|
|||||||
import { NextFunction, Request, Response } from 'express'
|
import { NextFunction, Request, Response } from 'express'
|
||||||
import { rateLimit, RateLimitRequestHandler } from 'express-rate-limit'
|
import { rateLimit, RateLimitRequestHandler } from 'express-rate-limit'
|
||||||
|
import { IChatFlow } from '../Interface'
|
||||||
|
import { Mutex } from 'async-mutex'
|
||||||
|
|
||||||
interface RateLimit {
|
let rateLimiters: Record<string, RateLimitRequestHandler> = {}
|
||||||
id: string
|
const rateLimiterMutex = new Mutex()
|
||||||
rateLimitObj: RateLimitRequestHandler
|
|
||||||
}
|
|
||||||
|
|
||||||
export const specificRouteLimiter: RateLimitRequestHandler = rateLimit({
|
export async function createRateLimiter(id: string, duration: number, limit: number, message: string) {
|
||||||
windowMs: 1 * 60 * 1000, // 15 minutes
|
const release = await rateLimiterMutex.acquire()
|
||||||
max: 1, // Limit each IP to 100 requests per windowMs
|
try {
|
||||||
message: 'Too many requests, please try again later.'
|
rateLimiters[id] = rateLimit({
|
||||||
})
|
windowMs: duration,
|
||||||
|
max: limit,
|
||||||
let rateLimiters: RateLimit[] = []
|
handler: (req, res) => {
|
||||||
|
res.status(429).json({ error: message })
|
||||||
export function createRateLimiter(req: Request) {
|
}
|
||||||
const id = req.body.id
|
|
||||||
const duration = req.body.duration
|
|
||||||
const limit = req.body.limit
|
|
||||||
const message = req.body.message
|
|
||||||
|
|
||||||
const rateLimitObj: RateLimitRequestHandler = rateLimit({
|
|
||||||
windowMs: Number(duration),
|
|
||||||
max: limit,
|
|
||||||
handler: (req, res) => {
|
|
||||||
res.status(429).json({ error: message })
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
const existingIndex: number = rateLimiters.findIndex((rateLimit) => rateLimit.id === id)
|
|
||||||
|
|
||||||
if (existingIndex === -1) {
|
|
||||||
rateLimiters.push({
|
|
||||||
id,
|
|
||||||
rateLimitObj
|
|
||||||
})
|
})
|
||||||
} else {
|
} finally {
|
||||||
rateLimiters[existingIndex] = {
|
release()
|
||||||
id,
|
|
||||||
rateLimitObj
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export function getRateLimiter(req: Request, res: Response, next: NextFunction) {
|
export function getRateLimiter(req: Request, res: Response, next: NextFunction) {
|
||||||
const id = req.params.id
|
const id = req.params.id
|
||||||
|
|
||||||
const ratelimiter = rateLimiters.find((rateLimit) => rateLimit.id === id)
|
if (!rateLimiters[id]) return next()
|
||||||
|
|
||||||
if (!ratelimiter) return next()
|
const idRateLimiter = rateLimiters[id]
|
||||||
|
|
||||||
const idRateLimiter = ratelimiter.rateLimitObj
|
|
||||||
|
|
||||||
return idRateLimiter(req, res, next)
|
return idRateLimiter(req, res, next)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function initializeRateLimiter(ChatFlowPool: IChatFlow[]) {
|
||||||
|
await ChatFlowPool.map(async (ChatFlow) => {
|
||||||
|
if (ChatFlow.rateLimitDuration && ChatFlow.rateLimit && ChatFlow.rateLimitMsg)
|
||||||
|
await createRateLimiter(ChatFlow.id, ChatFlow.rateLimitDuration, ChatFlow.rateLimit, ChatFlow.rateLimitMsg)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user