From 057e056257857acba9afc25995a0b00dde8b5a27 Mon Sep 17 00:00:00 2001 From: Henry Heng Date: Tue, 9 Apr 2024 14:36:24 +0100 Subject: [PATCH] Bugfix/Add missing TTL implementation for Redis (#2131) add missing TTL implementation --- .../RedisBackedChatMemory/RedisBackedChatMemory.ts | 8 +++++++- .../UpstashRedisBackedChatMemory.ts | 12 ++++++++++-- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts b/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts index 7cd6d5f1..b9f27a90 100644 --- a/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts +++ b/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts @@ -162,7 +162,8 @@ const initalizeRedis = async (nodeData: INodeData, options: ICommonObject): Prom chatHistory: redisChatMessageHistory, sessionId, windowSize, - redisClient: client + redisClient: client, + sessionTTL }) return memory @@ -172,18 +173,21 @@ interface BufferMemoryExtendedInput { redisClient: Redis sessionId: string windowSize?: number + sessionTTL?: number } class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods { sessionId = '' redisClient: Redis windowSize?: number + sessionTTL?: number constructor(fields: BufferMemoryInput & BufferMemoryExtendedInput) { super(fields) this.sessionId = fields.sessionId this.redisClient = fields.redisClient this.windowSize = fields.windowSize + this.sessionTTL = fields.sessionTTL } async getChatMessages(overrideSessionId = '', returnBaseMessages = false): Promise { @@ -207,12 +211,14 @@ class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods { const newInputMessage = new HumanMessage(input.text) const messageToAdd = [newInputMessage].map((msg) => msg.toDict()) await this.redisClient.lpush(id, JSON.stringify(messageToAdd[0])) + if (this.sessionTTL) await this.redisClient.expire(id, this.sessionTTL) } if (output) { const newOutputMessage = new AIMessage(output.text) const messageToAdd = [newOutputMessage].map((msg) => msg.toDict()) await this.redisClient.lpush(id, JSON.stringify(messageToAdd[0])) + if (this.sessionTTL) await this.redisClient.expire(id, this.sessionTTL) } } diff --git a/packages/components/nodes/memory/UpstashRedisBackedChatMemory/UpstashRedisBackedChatMemory.ts b/packages/components/nodes/memory/UpstashRedisBackedChatMemory/UpstashRedisBackedChatMemory.ts index 52da0f37..617aa71d 100644 --- a/packages/components/nodes/memory/UpstashRedisBackedChatMemory/UpstashRedisBackedChatMemory.ts +++ b/packages/components/nodes/memory/UpstashRedisBackedChatMemory/UpstashRedisBackedChatMemory.ts @@ -88,8 +88,10 @@ class UpstashRedisBackedChatMemory_Memory implements INode { const initalizeUpstashRedis = async (nodeData: INodeData, options: ICommonObject): Promise => { const baseURL = nodeData.inputs?.baseURL as string - const sessionTTL = nodeData.inputs?.sessionTTL as string const sessionId = nodeData.inputs?.sessionId as string + const _sessionTTL = nodeData.inputs?.sessionTTL as string + + const sessionTTL = _sessionTTL ? parseInt(_sessionTTL, 10) : undefined const credentialData = await getCredentialData(nodeData.credential ?? '', options) const upstashRestToken = getCredentialParam('upstashRestToken', credentialData, nodeData) @@ -101,7 +103,7 @@ const initalizeUpstashRedis = async (nodeData: INodeData, options: ICommonObject const redisChatMessageHistory = new UpstashRedisChatMessageHistory({ sessionId, - sessionTTL: sessionTTL ? parseInt(sessionTTL, 10) : undefined, + sessionTTL, client }) @@ -109,6 +111,7 @@ const initalizeUpstashRedis = async (nodeData: INodeData, options: ICommonObject memoryKey: 'chat_history', chatHistory: redisChatMessageHistory, sessionId, + sessionTTL, redisClient: client }) @@ -118,16 +121,19 @@ const initalizeUpstashRedis = async (nodeData: INodeData, options: ICommonObject interface BufferMemoryExtendedInput { redisClient: Redis sessionId: string + sessionTTL?: number } class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods { sessionId = '' redisClient: Redis + sessionTTL?: number constructor(fields: BufferMemoryInput & BufferMemoryExtendedInput) { super(fields) this.sessionId = fields.sessionId this.redisClient = fields.redisClient + this.sessionTTL = fields.sessionTTL } async getChatMessages(overrideSessionId = '', returnBaseMessages = false): Promise { @@ -152,12 +158,14 @@ class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods { const newInputMessage = new HumanMessage(input.text) const messageToAdd = [newInputMessage].map((msg) => msg.toDict()) await this.redisClient.lpush(id, JSON.stringify(messageToAdd[0])) + if (this.sessionTTL) await this.redisClient.expire(id, this.sessionTTL) } if (output) { const newOutputMessage = new AIMessage(output.text) const messageToAdd = [newOutputMessage].map((msg) => msg.toDict()) await this.redisClient.lpush(id, JSON.stringify(messageToAdd[0])) + if (this.sessionTTL) await this.redisClient.expire(id, this.sessionTTL) } }