mirror of
https://github.com/farcasclaudiu/Flowise.git
synced 2026-06-28 19:00:59 +03:00
add in-mem llm cache
This commit is contained in:
@@ -0,0 +1,64 @@
|
|||||||
|
import { getBaseClasses, ICommonObject, INode, INodeData, INodeParams } from '../../../src'
|
||||||
|
import { BaseCache } from 'langchain/schema'
|
||||||
|
import hash from 'object-hash'
|
||||||
|
|
||||||
|
class InMemoryCache implements INode {
|
||||||
|
label: string
|
||||||
|
name: string
|
||||||
|
version: number
|
||||||
|
description: string
|
||||||
|
type: string
|
||||||
|
icon: string
|
||||||
|
category: string
|
||||||
|
baseClasses: string[]
|
||||||
|
inputs: INodeParams[]
|
||||||
|
credential: INodeParams
|
||||||
|
|
||||||
|
constructor() {
|
||||||
|
this.label = 'InMemory Cache'
|
||||||
|
this.name = 'inMemoryCache'
|
||||||
|
this.version = 1.0
|
||||||
|
this.type = 'InMemoryCache'
|
||||||
|
this.icon = 'inmemorycache.png'
|
||||||
|
this.category = 'Cache'
|
||||||
|
this.baseClasses = [this.type, ...getBaseClasses(InMemoryCacheExtended)]
|
||||||
|
this.inputs = []
|
||||||
|
}
|
||||||
|
|
||||||
|
async init(nodeData: INodeData, _: string, options: ICommonObject): Promise<any> {
|
||||||
|
const memoryMap = options.cachePool.getLLMCache(options.chatflowid) ?? new Map()
|
||||||
|
const inMemCache = new InMemoryCacheExtended(memoryMap)
|
||||||
|
|
||||||
|
inMemCache.lookup = async (prompt: string, llmKey: string): Promise<any | null> => {
|
||||||
|
const memory = options.cachePool.getLLMCache(options.chatflowid) ?? inMemCache.cache
|
||||||
|
return Promise.resolve(memory.get(getCacheKey(prompt, llmKey)) ?? null)
|
||||||
|
}
|
||||||
|
|
||||||
|
inMemCache.update = async (prompt: string, llmKey: string, value: any): Promise<void> => {
|
||||||
|
inMemCache.cache.set(getCacheKey(prompt, llmKey), value)
|
||||||
|
options.cachePool.addLLMCache(options.chatflowid, inMemCache.cache)
|
||||||
|
}
|
||||||
|
return inMemCache
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const getCacheKey = (...strings: string[]): string => hash(strings.join('_'))
|
||||||
|
|
||||||
|
class InMemoryCacheExtended extends BaseCache {
|
||||||
|
cache: Map<string, any>
|
||||||
|
|
||||||
|
constructor(map: Map<string, any>) {
|
||||||
|
super()
|
||||||
|
this.cache = map
|
||||||
|
}
|
||||||
|
|
||||||
|
lookup(prompt: string, llmKey: string): Promise<any | null> {
|
||||||
|
return Promise.resolve(this.cache.get(getCacheKey(prompt, llmKey)) ?? null)
|
||||||
|
}
|
||||||
|
|
||||||
|
async update(prompt: string, llmKey: string, value: any): Promise<void> {
|
||||||
|
this.cache.set(getCacheKey(prompt, llmKey), value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = { nodeClass: InMemoryCache }
|
||||||
Binary file not shown.
|
After Width: | Height: | Size: 12 KiB |
@@ -57,6 +57,7 @@
|
|||||||
"node-fetch": "^2.6.11",
|
"node-fetch": "^2.6.11",
|
||||||
"node-html-markdown": "^1.3.0",
|
"node-html-markdown": "^1.3.0",
|
||||||
"notion-to-md": "^3.1.1",
|
"notion-to-md": "^3.1.1",
|
||||||
|
"object-hash": "^3.0.0",
|
||||||
"pdf-parse": "^1.1.1",
|
"pdf-parse": "^1.1.1",
|
||||||
"pdfjs-dist": "^3.7.107",
|
"pdfjs-dist": "^3.7.107",
|
||||||
"pg": "^8.11.2",
|
"pg": "^8.11.2",
|
||||||
@@ -73,6 +74,7 @@
|
|||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
"@types/gulp": "4.0.9",
|
"@types/gulp": "4.0.9",
|
||||||
"@types/node-fetch": "2.6.2",
|
"@types/node-fetch": "2.6.2",
|
||||||
|
"@types/object-hash": "^3.0.2",
|
||||||
"@types/pg": "^8.10.2",
|
"@types/pg": "^8.10.2",
|
||||||
"@types/ws": "^8.5.3",
|
"@types/ws": "^8.5.3",
|
||||||
"gulp": "^4.0.2",
|
"gulp": "^4.0.2",
|
||||||
|
|||||||
@@ -0,0 +1,53 @@
|
|||||||
|
import { IActiveCache } from './Interface'
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This pool is to keep track of in-memory cache used for LLM and Embeddings
|
||||||
|
*/
|
||||||
|
export class CachePool {
|
||||||
|
activeLLMCache: IActiveCache = {}
|
||||||
|
activeEmbeddingCache: IActiveCache = {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add to the llm cache pool
|
||||||
|
* @param {string} chatflowid
|
||||||
|
* @param {Map<any, any>} value
|
||||||
|
*/
|
||||||
|
addLLMCache(chatflowid: string, value: Map<any, any>) {
|
||||||
|
this.activeLLMCache[chatflowid] = value
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add to the embedding cache pool
|
||||||
|
* @param {string} chatflowid
|
||||||
|
* @param {Map<any, any>} value
|
||||||
|
*/
|
||||||
|
addEmbeddingCache(chatflowid: string, value: Map<any, any>) {
|
||||||
|
this.activeEmbeddingCache[chatflowid] = value
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get item from llm cache pool
|
||||||
|
* @param {string} chatflowid
|
||||||
|
*/
|
||||||
|
getLLMCache(chatflowid: string): Map<any, any> | undefined {
|
||||||
|
return this.activeLLMCache[chatflowid]
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get item from embedding cache pool
|
||||||
|
* @param {string} chatflowid
|
||||||
|
*/
|
||||||
|
getEmbeddingCache(chatflowid: string): Map<any, any> | undefined {
|
||||||
|
return this.activeEmbeddingCache[chatflowid]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let cachePoolInstance: CachePool | undefined
|
||||||
|
|
||||||
|
export function getInstance(): CachePool {
|
||||||
|
if (cachePoolInstance === undefined) {
|
||||||
|
cachePoolInstance = new CachePool()
|
||||||
|
}
|
||||||
|
|
||||||
|
return cachePoolInstance
|
||||||
|
}
|
||||||
@@ -157,6 +157,10 @@ export interface IActiveChatflows {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export interface IActiveCache {
|
||||||
|
[key: string]: Map<any, any>
|
||||||
|
}
|
||||||
|
|
||||||
export interface IOverrideConfig {
|
export interface IOverrideConfig {
|
||||||
node: string
|
node: string
|
||||||
nodeId: string
|
nodeId: string
|
||||||
|
|||||||
@@ -53,6 +53,7 @@ import { ChatMessage } from './database/entities/ChatMessage'
|
|||||||
import { Credential } from './database/entities/Credential'
|
import { Credential } from './database/entities/Credential'
|
||||||
import { Tool } from './database/entities/Tool'
|
import { Tool } from './database/entities/Tool'
|
||||||
import { ChatflowPool } from './ChatflowPool'
|
import { ChatflowPool } from './ChatflowPool'
|
||||||
|
import { CachePool } from './CachePool'
|
||||||
import { ICommonObject, INodeOptionsValue } from 'flowise-components'
|
import { ICommonObject, INodeOptionsValue } from 'flowise-components'
|
||||||
import { createRateLimiter, getRateLimiter, initializeRateLimiter } from './utils/rateLimit'
|
import { createRateLimiter, getRateLimiter, initializeRateLimiter } from './utils/rateLimit'
|
||||||
|
|
||||||
@@ -60,6 +61,7 @@ export class App {
|
|||||||
app: express.Application
|
app: express.Application
|
||||||
nodesPool: NodesPool
|
nodesPool: NodesPool
|
||||||
chatflowPool: ChatflowPool
|
chatflowPool: ChatflowPool
|
||||||
|
cachePool: CachePool
|
||||||
AppDataSource = getDataSource()
|
AppDataSource = getDataSource()
|
||||||
|
|
||||||
constructor() {
|
constructor() {
|
||||||
@@ -91,6 +93,9 @@ export class App {
|
|||||||
// Initialize Rate Limit
|
// Initialize Rate Limit
|
||||||
const AllChatFlow: IChatFlow[] = await getAllChatFlow()
|
const AllChatFlow: IChatFlow[] = await getAllChatFlow()
|
||||||
await initializeRateLimiter(AllChatFlow)
|
await initializeRateLimiter(AllChatFlow)
|
||||||
|
|
||||||
|
// Initialize cache pool
|
||||||
|
this.cachePool = new CachePool()
|
||||||
})
|
})
|
||||||
.catch((err) => {
|
.catch((err) => {
|
||||||
logger.error('❌ [server]: Error during Data Source initialization:', err)
|
logger.error('❌ [server]: Error during Data Source initialization:', err)
|
||||||
@@ -944,8 +949,10 @@ export class App {
|
|||||||
incomingInput.question,
|
incomingInput.question,
|
||||||
incomingInput.history,
|
incomingInput.history,
|
||||||
chatId,
|
chatId,
|
||||||
|
chatflowid,
|
||||||
this.AppDataSource,
|
this.AppDataSource,
|
||||||
incomingInput?.overrideConfig
|
incomingInput?.overrideConfig,
|
||||||
|
this.cachePool
|
||||||
)
|
)
|
||||||
|
|
||||||
const nodeToExecute = reactFlowNodes.find((node: IReactFlowNode) => node.id === endingNodeId)
|
const nodeToExecute = reactFlowNodes.find((node: IReactFlowNode) => node.id === endingNodeId)
|
||||||
|
|||||||
@@ -35,6 +35,7 @@ import { ChatMessage } from '../database/entities/ChatMessage'
|
|||||||
import { Credential } from '../database/entities/Credential'
|
import { Credential } from '../database/entities/Credential'
|
||||||
import { Tool } from '../database/entities/Tool'
|
import { Tool } from '../database/entities/Tool'
|
||||||
import { DataSource } from 'typeorm'
|
import { DataSource } from 'typeorm'
|
||||||
|
import { CachePool } from '../CachePool'
|
||||||
|
|
||||||
const QUESTION_VAR_PREFIX = 'question'
|
const QUESTION_VAR_PREFIX = 'question'
|
||||||
const CHAT_HISTORY_VAR_PREFIX = 'chat_history'
|
const CHAT_HISTORY_VAR_PREFIX = 'chat_history'
|
||||||
@@ -197,8 +198,10 @@ export const getEndingNode = (nodeDependencies: INodeDependencies, graph: INodeD
|
|||||||
* @param {IComponentNodes} componentNodes
|
* @param {IComponentNodes} componentNodes
|
||||||
* @param {string} question
|
* @param {string} question
|
||||||
* @param {string} chatId
|
* @param {string} chatId
|
||||||
|
* @param {string} chatflowid
|
||||||
* @param {DataSource} appDataSource
|
* @param {DataSource} appDataSource
|
||||||
* @param {ICommonObject} overrideConfig
|
* @param {ICommonObject} overrideConfig
|
||||||
|
* @param {CachePool} cachePool
|
||||||
*/
|
*/
|
||||||
export const buildLangchain = async (
|
export const buildLangchain = async (
|
||||||
startingNodeIds: string[],
|
startingNodeIds: string[],
|
||||||
@@ -209,8 +212,10 @@ export const buildLangchain = async (
|
|||||||
question: string,
|
question: string,
|
||||||
chatHistory: IMessage[],
|
chatHistory: IMessage[],
|
||||||
chatId: string,
|
chatId: string,
|
||||||
|
chatflowid: string,
|
||||||
appDataSource: DataSource,
|
appDataSource: DataSource,
|
||||||
overrideConfig?: ICommonObject
|
overrideConfig?: ICommonObject,
|
||||||
|
cachePool?: CachePool
|
||||||
) => {
|
) => {
|
||||||
const flowNodes = cloneDeep(reactFlowNodes)
|
const flowNodes = cloneDeep(reactFlowNodes)
|
||||||
|
|
||||||
@@ -245,9 +250,11 @@ export const buildLangchain = async (
|
|||||||
logger.debug(`[server]: Initializing ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
|
logger.debug(`[server]: Initializing ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
|
||||||
flowNodes[nodeIndex].data.instance = await newNodeInstance.init(reactFlowNodeData, question, {
|
flowNodes[nodeIndex].data.instance = await newNodeInstance.init(reactFlowNodeData, question, {
|
||||||
chatId,
|
chatId,
|
||||||
|
chatflowid,
|
||||||
appDataSource,
|
appDataSource,
|
||||||
databaseEntities,
|
databaseEntities,
|
||||||
logger
|
logger,
|
||||||
|
cachePool
|
||||||
})
|
})
|
||||||
logger.debug(`[server]: Finished initializing ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
|
logger.debug(`[server]: Finished initializing ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
|
||||||
} catch (e: any) {
|
} catch (e: any) {
|
||||||
|
|||||||
Reference in New Issue
Block a user