Merge pull request #1036 from FlowiseAI/feature/InMemoryCache

Feature/In-Mem LLMcache
This commit is contained in:
Henry Heng
2023-10-12 12:21:13 +01:00
committed by GitHub
11 changed files with 147 additions and 3 deletions
@@ -4,12 +4,15 @@ class UpstashRedisApi implements INodeCredential {
label: string label: string
name: string name: string
version: number version: number
description: string
inputs: INodeParams[] inputs: INodeParams[]
constructor() { constructor() {
this.label = 'Upstash Redis API' this.label = 'Upstash Redis API'
this.name = 'upstashRedisApi' this.name = 'upstashRedisApi'
this.version = 1.0 this.version = 1.0
this.description =
'Refer to <a target="_blank" href="https://upstash.com/docs/redis/overall/getstarted">official guide</a> on how to create redis instance and get redis REST URL and Token'
this.inputs = [ this.inputs = [
{ {
label: 'Upstash Redis REST URL', label: 'Upstash Redis REST URL',
@@ -0,0 +1,65 @@
import { getBaseClasses, ICommonObject, INode, INodeData, INodeParams } from '../../../src'
import { BaseCache } from 'langchain/schema'
import hash from 'object-hash'
class InMemoryCache implements INode {
label: string
name: string
version: number
description: string
type: string
icon: string
category: string
baseClasses: string[]
inputs: INodeParams[]
credential: INodeParams
constructor() {
this.label = 'InMemory Cache'
this.name = 'inMemoryCache'
this.version = 1.0
this.type = 'InMemoryCache'
this.description = 'Cache LLM response in memory, will be cleared once app restarted'
this.icon = 'inmemorycache.png'
this.category = 'Cache'
this.baseClasses = [this.type, ...getBaseClasses(InMemoryCacheExtended)]
this.inputs = []
}
async init(nodeData: INodeData, _: string, options: ICommonObject): Promise<any> {
const memoryMap = options.cachePool.getLLMCache(options.chatflowid) ?? new Map()
const inMemCache = new InMemoryCacheExtended(memoryMap)
inMemCache.lookup = async (prompt: string, llmKey: string): Promise<any | null> => {
const memory = options.cachePool.getLLMCache(options.chatflowid) ?? inMemCache.cache
return Promise.resolve(memory.get(getCacheKey(prompt, llmKey)) ?? null)
}
inMemCache.update = async (prompt: string, llmKey: string, value: any): Promise<void> => {
inMemCache.cache.set(getCacheKey(prompt, llmKey), value)
options.cachePool.addLLMCache(options.chatflowid, inMemCache.cache)
}
return inMemCache
}
}
const getCacheKey = (...strings: string[]): string => hash(strings.join('_'))
class InMemoryCacheExtended extends BaseCache {
cache: Map<string, any>
constructor(map: Map<string, any>) {
super()
this.cache = map
}
lookup(prompt: string, llmKey: string): Promise<any | null> {
return Promise.resolve(this.cache.get(getCacheKey(prompt, llmKey)) ?? null)
}
async update(prompt: string, llmKey: string, value: any): Promise<void> {
this.cache.set(getCacheKey(prompt, llmKey), value)
}
}
module.exports = { nodeClass: InMemoryCache }
Binary file not shown.

After

Width:  |  Height:  |  Size: 12 KiB

@@ -19,6 +19,7 @@ class MomentoCache implements INode {
this.name = 'momentoCache' this.name = 'momentoCache'
this.version = 1.0 this.version = 1.0
this.type = 'MomentoCache' this.type = 'MomentoCache'
this.description = 'Cache LLM response using Momento, a distributed, serverless cache'
this.icon = 'momento.png' this.icon = 'momento.png'
this.category = 'Cache' this.category = 'Cache'
this.baseClasses = [this.type, ...getBaseClasses(LangchainMomentoCache)] this.baseClasses = [this.type, ...getBaseClasses(LangchainMomentoCache)]
@@ -19,6 +19,7 @@ class RedisCache implements INode {
this.name = 'redisCache' this.name = 'redisCache'
this.version = 1.0 this.version = 1.0
this.type = 'RedisCache' this.type = 'RedisCache'
this.description = 'Cache LLM response in Redis, useful for sharing cache across multiple processes or servers'
this.icon = 'redis.svg' this.icon = 'redis.svg'
this.category = 'Cache' this.category = 'Cache'
this.baseClasses = [this.type, ...getBaseClasses(LangchainRedisCache)] this.baseClasses = [this.type, ...getBaseClasses(LangchainRedisCache)]
@@ -18,6 +18,7 @@ class UpstashRedisCache implements INode {
this.name = 'upstashRedisCache' this.name = 'upstashRedisCache'
this.version = 1.0 this.version = 1.0
this.type = 'UpstashRedisCache' this.type = 'UpstashRedisCache'
this.description = 'Cache LLM response in Upstash Redis, serverless data for Redis and Kafka'
this.icon = 'upstash.png' this.icon = 'upstash.png'
this.category = 'Cache' this.category = 'Cache'
this.baseClasses = [this.type, ...getBaseClasses(LangchainUpstashRedisCache)] this.baseClasses = [this.type, ...getBaseClasses(LangchainUpstashRedisCache)]
+2
View File
@@ -57,6 +57,7 @@
"node-fetch": "^2.6.11", "node-fetch": "^2.6.11",
"node-html-markdown": "^1.3.0", "node-html-markdown": "^1.3.0",
"notion-to-md": "^3.1.1", "notion-to-md": "^3.1.1",
"object-hash": "^3.0.0",
"pdf-parse": "^1.1.1", "pdf-parse": "^1.1.1",
"pdfjs-dist": "^3.7.107", "pdfjs-dist": "^3.7.107",
"pg": "^8.11.2", "pg": "^8.11.2",
@@ -73,6 +74,7 @@
"devDependencies": { "devDependencies": {
"@types/gulp": "4.0.9", "@types/gulp": "4.0.9",
"@types/node-fetch": "2.6.2", "@types/node-fetch": "2.6.2",
"@types/object-hash": "^3.0.2",
"@types/pg": "^8.10.2", "@types/pg": "^8.10.2",
"@types/ws": "^8.5.3", "@types/ws": "^8.5.3",
"gulp": "^4.0.2", "gulp": "^4.0.2",
+53
View File
@@ -0,0 +1,53 @@
import { IActiveCache } from './Interface'
/**
* This pool is to keep track of in-memory cache used for LLM and Embeddings
*/
export class CachePool {
activeLLMCache: IActiveCache = {}
activeEmbeddingCache: IActiveCache = {}
/**
* Add to the llm cache pool
* @param {string} chatflowid
* @param {Map<any, any>} value
*/
addLLMCache(chatflowid: string, value: Map<any, any>) {
this.activeLLMCache[chatflowid] = value
}
/**
* Add to the embedding cache pool
* @param {string} chatflowid
* @param {Map<any, any>} value
*/
addEmbeddingCache(chatflowid: string, value: Map<any, any>) {
this.activeEmbeddingCache[chatflowid] = value
}
/**
* Get item from llm cache pool
* @param {string} chatflowid
*/
getLLMCache(chatflowid: string): Map<any, any> | undefined {
return this.activeLLMCache[chatflowid]
}
/**
* Get item from embedding cache pool
* @param {string} chatflowid
*/
getEmbeddingCache(chatflowid: string): Map<any, any> | undefined {
return this.activeEmbeddingCache[chatflowid]
}
}
let cachePoolInstance: CachePool | undefined
export function getInstance(): CachePool {
if (cachePoolInstance === undefined) {
cachePoolInstance = new CachePool()
}
return cachePoolInstance
}
+4
View File
@@ -157,6 +157,10 @@ export interface IActiveChatflows {
} }
} }
export interface IActiveCache {
[key: string]: Map<any, any>
}
export interface IOverrideConfig { export interface IOverrideConfig {
node: string node: string
nodeId: string nodeId: string
+8 -1
View File
@@ -53,6 +53,7 @@ import { ChatMessage } from './database/entities/ChatMessage'
import { Credential } from './database/entities/Credential' import { Credential } from './database/entities/Credential'
import { Tool } from './database/entities/Tool' import { Tool } from './database/entities/Tool'
import { ChatflowPool } from './ChatflowPool' import { ChatflowPool } from './ChatflowPool'
import { CachePool } from './CachePool'
import { ICommonObject, INodeOptionsValue } from 'flowise-components' import { ICommonObject, INodeOptionsValue } from 'flowise-components'
import { createRateLimiter, getRateLimiter, initializeRateLimiter } from './utils/rateLimit' import { createRateLimiter, getRateLimiter, initializeRateLimiter } from './utils/rateLimit'
@@ -60,6 +61,7 @@ export class App {
app: express.Application app: express.Application
nodesPool: NodesPool nodesPool: NodesPool
chatflowPool: ChatflowPool chatflowPool: ChatflowPool
cachePool: CachePool
AppDataSource = getDataSource() AppDataSource = getDataSource()
constructor() { constructor() {
@@ -91,6 +93,9 @@ export class App {
// Initialize Rate Limit // Initialize Rate Limit
const AllChatFlow: IChatFlow[] = await getAllChatFlow() const AllChatFlow: IChatFlow[] = await getAllChatFlow()
await initializeRateLimiter(AllChatFlow) await initializeRateLimiter(AllChatFlow)
// Initialize cache pool
this.cachePool = new CachePool()
}) })
.catch((err) => { .catch((err) => {
logger.error('❌ [server]: Error during Data Source initialization:', err) logger.error('❌ [server]: Error during Data Source initialization:', err)
@@ -944,8 +949,10 @@ export class App {
incomingInput.question, incomingInput.question,
incomingInput.history, incomingInput.history,
chatId, chatId,
chatflowid,
this.AppDataSource, this.AppDataSource,
incomingInput?.overrideConfig incomingInput?.overrideConfig,
this.cachePool
) )
const nodeToExecute = reactFlowNodes.find((node: IReactFlowNode) => node.id === endingNodeId) const nodeToExecute = reactFlowNodes.find((node: IReactFlowNode) => node.id === endingNodeId)
+9 -2
View File
@@ -35,6 +35,7 @@ import { ChatMessage } from '../database/entities/ChatMessage'
import { Credential } from '../database/entities/Credential' import { Credential } from '../database/entities/Credential'
import { Tool } from '../database/entities/Tool' import { Tool } from '../database/entities/Tool'
import { DataSource } from 'typeorm' import { DataSource } from 'typeorm'
import { CachePool } from '../CachePool'
const QUESTION_VAR_PREFIX = 'question' const QUESTION_VAR_PREFIX = 'question'
const CHAT_HISTORY_VAR_PREFIX = 'chat_history' const CHAT_HISTORY_VAR_PREFIX = 'chat_history'
@@ -197,8 +198,10 @@ export const getEndingNode = (nodeDependencies: INodeDependencies, graph: INodeD
* @param {IComponentNodes} componentNodes * @param {IComponentNodes} componentNodes
* @param {string} question * @param {string} question
* @param {string} chatId * @param {string} chatId
* @param {string} chatflowid
* @param {DataSource} appDataSource * @param {DataSource} appDataSource
* @param {ICommonObject} overrideConfig * @param {ICommonObject} overrideConfig
* @param {CachePool} cachePool
*/ */
export const buildLangchain = async ( export const buildLangchain = async (
startingNodeIds: string[], startingNodeIds: string[],
@@ -209,8 +212,10 @@ export const buildLangchain = async (
question: string, question: string,
chatHistory: IMessage[], chatHistory: IMessage[],
chatId: string, chatId: string,
chatflowid: string,
appDataSource: DataSource, appDataSource: DataSource,
overrideConfig?: ICommonObject overrideConfig?: ICommonObject,
cachePool?: CachePool
) => { ) => {
const flowNodes = cloneDeep(reactFlowNodes) const flowNodes = cloneDeep(reactFlowNodes)
@@ -245,9 +250,11 @@ export const buildLangchain = async (
logger.debug(`[server]: Initializing ${reactFlowNode.data.label} (${reactFlowNode.data.id})`) logger.debug(`[server]: Initializing ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
flowNodes[nodeIndex].data.instance = await newNodeInstance.init(reactFlowNodeData, question, { flowNodes[nodeIndex].data.instance = await newNodeInstance.init(reactFlowNodeData, question, {
chatId, chatId,
chatflowid,
appDataSource, appDataSource,
databaseEntities, databaseEntities,
logger logger,
cachePool
}) })
logger.debug(`[server]: Finished initializing ${reactFlowNode.data.label} (${reactFlowNode.data.id})`) logger.debug(`[server]: Finished initializing ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
} catch (e: any) { } catch (e: any) {