From e8deeb25cff10b8d17393de3632b6febcee8c336 Mon Sep 17 00:00:00 2001 From: Anuj Verma <42962743+Ashes47@users.noreply.github.com> Date: Sun, 28 Jan 2024 19:45:19 +0530 Subject: [PATCH 1/6] Update MongoDBMemory.ts --- .../memory/MongoDBMemory/MongoDBMemory.ts | 145 +++++++++--------- 1 file changed, 76 insertions(+), 69 deletions(-) diff --git a/packages/components/nodes/memory/MongoDBMemory/MongoDBMemory.ts b/packages/components/nodes/memory/MongoDBMemory/MongoDBMemory.ts index c593c20d..b35de5ae 100644 --- a/packages/components/nodes/memory/MongoDBMemory/MongoDBMemory.ts +++ b/packages/components/nodes/memory/MongoDBMemory/MongoDBMemory.ts @@ -1,9 +1,19 @@ -import { MongoClient, Collection, Document } from 'mongodb' -import { MongoDBChatMessageHistory } from 'langchain/stores/message/mongodb' -import { BufferMemory, BufferMemoryInput } from 'langchain/memory' -import { mapStoredMessageToChatMessage, AIMessage, HumanMessage, BaseMessage } from 'langchain/schema' -import { convertBaseMessagetoIMessage, getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils' -import { FlowiseMemory, ICommonObject, IMessage, INode, INodeData, INodeParams, MemoryMethods, MessageType } from '../../../src/Interface' +import { MongoClient, Collection, Document } from 'mongodb'; +import { MongoDBChatMessageHistory } from 'langchain/stores/message/mongodb'; +import { BufferMemory, BufferMemoryInput } from 'langchain/memory'; +import { mapStoredMessageToChatMessage, AIMessage, HumanMessage, BaseMessage } from 'langchain/schema'; +import { convertBaseMessagetoIMessage, getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils'; +import { FlowiseMemory, ICommonObject, IMessage, INode, INodeData, INodeParams, MemoryMethods, MessageType } from '../../../src/Interface'; + +let mongoClientSingleton = null; + +const getMongoClient = async (mongoDBConnectUrl) => { + if (!mongoClientSingleton) { + mongoClientSingleton = new MongoClient(mongoDBConnectUrl, { useNewUrlParser: true, useUnifiedTopology: true }); + await mongoClientSingleton.connect(); + } + return mongoClientSingleton; +}; class MongoDB_Memory implements INode { label: string @@ -18,20 +28,20 @@ class MongoDB_Memory implements INode { inputs: INodeParams[] constructor() { - this.label = 'MongoDB Atlas Chat Memory' - this.name = 'MongoDBAtlasChatMemory' - this.version = 1.0 - this.type = 'MongoDBAtlasChatMemory' - this.icon = 'mongodb.svg' - this.category = 'Memory' - this.description = 'Stores the conversation in MongoDB Atlas' - this.baseClasses = [this.type, ...getBaseClasses(BufferMemory)] + this.label = 'MongoDB Atlas Chat Memory'; + this.name = 'MongoDBAtlasChatMemory'; + this.version = 1.0; + this.type = 'MongoDBAtlasChatMemory'; + this.icon = 'mongodb.svg'; + this.category = 'Memory'; + this.description = 'Stores the conversation in MongoDB Atlas'; + this.baseClasses = [this.type, ...getBaseClasses(BufferMemory)]; this.credential = { label: 'Connect Credential', name: 'credential', type: 'credential', credentialNames: ['mongoDBUrlApi'] - } + }; this.inputs = [ { label: 'Database', @@ -49,8 +59,7 @@ class MongoDB_Memory implements INode { label: 'Session Id', name: 'sessionId', type: 'string', - description: - 'If not specified, a random id will be used. Learn more', + description: 'If not specified, a random id will be used. Learn more', default: '', additionalParams: true, optional: true @@ -62,128 +71,126 @@ class MongoDB_Memory implements INode { default: 'chat_history', additionalParams: true } - ] + ]; } async init(nodeData: INodeData, _: string, options: ICommonObject): Promise { - return initializeMongoDB(nodeData, options) + return initializeMongoDB(nodeData, options); } } const initializeMongoDB = async (nodeData: INodeData, options: ICommonObject): Promise => { - const databaseName = nodeData.inputs?.databaseName as string - const collectionName = nodeData.inputs?.collectionName as string - const memoryKey = nodeData.inputs?.memoryKey as string - const sessionId = nodeData.inputs?.sessionId as string + const databaseName = nodeData.inputs?.databaseName as string; + const collectionName = nodeData.inputs?.collectionName as string; + const memoryKey = nodeData.inputs?.memoryKey as string; + const sessionId = nodeData.inputs?.sessionId as string; - const credentialData = await getCredentialData(nodeData.credential ?? '', options) - const mongoDBConnectUrl = getCredentialParam('mongoDBConnectUrl', credentialData, nodeData) + const credentialData = await getCredentialData(nodeData.credential ?? '', options); + const mongoDBConnectUrl = getCredentialParam('mongoDBConnectUrl', credentialData, nodeData); - const client = new MongoClient(mongoDBConnectUrl) - await client.connect() - - const collection = client.db(databaseName).collection(collectionName) + const client = await getMongoClient(mongoDBConnectUrl); + const collection = client.db(databaseName).collection(collectionName); const mongoDBChatMessageHistory = new MongoDBChatMessageHistory({ collection, sessionId - }) + }); mongoDBChatMessageHistory.getMessages = async (): Promise => { const document = await collection.findOne({ sessionId: (mongoDBChatMessageHistory as any).sessionId - }) - const messages = document?.messages || [] - return messages.map(mapStoredMessageToChatMessage) - } + }); + const messages = document?.messages || []; + return messages.map(mapStoredMessageToChatMessage); + }; mongoDBChatMessageHistory.addMessage = async (message: BaseMessage): Promise => { - const messages = [message].map((msg) => msg.toDict()) + const messages = [message].map((msg) => msg.toDict()); await collection.updateOne( { sessionId: (mongoDBChatMessageHistory as any).sessionId }, { $push: { messages: { $each: messages } } }, { upsert: true } - ) - } + ); + }; mongoDBChatMessageHistory.clear = async (): Promise => { - await collection.deleteOne({ sessionId: (mongoDBChatMessageHistory as any).sessionId }) - } + await collection.deleteOne({ sessionId: (mongoDBChatMessageHistory as any).sessionId }); + }; return new BufferMemoryExtended({ memoryKey: memoryKey ?? 'chat_history', chatHistory: mongoDBChatMessageHistory, sessionId, collection - }) -} + }); +}; interface BufferMemoryExtendedInput { - collection: Collection - sessionId: string + collection: Collection; + sessionId: string; } class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods { - sessionId = '' - collection: Collection + sessionId = ''; + collection: Collection; constructor(fields: BufferMemoryInput & BufferMemoryExtendedInput) { - super(fields) - this.sessionId = fields.sessionId - this.collection = fields.collection + super(fields); + this.sessionId = fields.sessionId; + this.collection = fields.collection; } async getChatMessages(overrideSessionId = '', returnBaseMessages = false): Promise { - if (!this.collection) return [] + if (!this.collection) return []; - const id = overrideSessionId ?? this.sessionId - const document = await this.collection.findOne({ sessionId: id }) - const messages = document?.messages || [] - const baseMessages = messages.map(mapStoredMessageToChatMessage) - return returnBaseMessages ? baseMessages : convertBaseMessagetoIMessage(baseMessages) + const id = overrideSessionId ?? this.sessionId; + const document = await this.collection.findOne({ sessionId: id }); + const messages = document?.messages || []; + const baseMessages = messages.map(mapStoredMessageToChatMessage); + return returnBaseMessages ? baseMessages : convertBaseMessagetoIMessage(baseMessages); } async addChatMessages(msgArray: { text: string; type: MessageType }[], overrideSessionId = ''): Promise { - if (!this.collection) return + if (!this.collection) return; - const id = overrideSessionId ?? this.sessionId - const input = msgArray.find((msg) => msg.type === 'userMessage') - const output = msgArray.find((msg) => msg.type === 'apiMessage') + const id = overrideSessionId ?? this.sessionId; + const input = msgArray.find((msg) => msg.type === 'userMessage'); + const output = msgArray.find((msg) => msg.type === 'apiMessage'); if (input) { - const newInputMessage = new HumanMessage(input.text) - const messageToAdd = [newInputMessage].map((msg) => msg.toDict()) + const newInputMessage = new HumanMessage(input.text); + const messageToAdd = [newInputMessage].map((msg) => msg.toDict()); await this.collection.updateOne( { sessionId: id }, { $push: { messages: { $each: messageToAdd } } }, { upsert: true } - ) + ); } if (output) { - const newOutputMessage = new AIMessage(output.text) - const messageToAdd = [newOutputMessage].map((msg) => msg.toDict()) + const newOutputMessage = new AIMessage(output.text); + const messageToAdd = [newOutputMessage].map((msg) => msg.toDict()); await this.collection.updateOne( { sessionId: id }, { $push: { messages: { $each: messageToAdd } } }, { upsert: true } - ) + ); } } async clearChatMessages(overrideSessionId = ''): Promise { - if (!this.collection) return + if (!this.collection) return; - const id = overrideSessionId ?? this.sessionId - await this.collection.deleteOne({ sessionId: id }) - await this.clear() + const id = overrideSessionId ?? this.sessionId; + await this.collection.deleteOne({ sessionId: id }); + await this.clear(); } } -module.exports = { nodeClass: MongoDB_Memory } +module.exports = { nodeClass: MongoDB_Memory }; From e154461f1dac59415f867ed5acee7e0f6d195cdb Mon Sep 17 00:00:00 2001 From: Anuj Verma <42962743+Ashes47@users.noreply.github.com> Date: Sun, 28 Jan 2024 21:33:09 +0530 Subject: [PATCH 2/6] Update MongoDBMemory.ts --- .../memory/MongoDBMemory/MongoDBMemory.ts | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/packages/components/nodes/memory/MongoDBMemory/MongoDBMemory.ts b/packages/components/nodes/memory/MongoDBMemory/MongoDBMemory.ts index b35de5ae..c2c7ada6 100644 --- a/packages/components/nodes/memory/MongoDBMemory/MongoDBMemory.ts +++ b/packages/components/nodes/memory/MongoDBMemory/MongoDBMemory.ts @@ -1,19 +1,19 @@ -import { MongoClient, Collection, Document } from 'mongodb'; -import { MongoDBChatMessageHistory } from 'langchain/stores/message/mongodb'; -import { BufferMemory, BufferMemoryInput } from 'langchain/memory'; -import { mapStoredMessageToChatMessage, AIMessage, HumanMessage, BaseMessage } from 'langchain/schema'; -import { convertBaseMessagetoIMessage, getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils'; -import { FlowiseMemory, ICommonObject, IMessage, INode, INodeData, INodeParams, MemoryMethods, MessageType } from '../../../src/Interface'; +import { MongoClient, Collection, Document } from 'mongodb' +import { MongoDBChatMessageHistory } from 'langchain/stores/message/mongodb' +import { BufferMemory, BufferMemoryInput } from 'langchain/memory' +import { mapStoredMessageToChatMessage, AIMessage, HumanMessage, BaseMessage } from 'langchain/schema' +import { convertBaseMessagetoIMessage, getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils' +import { FlowiseMemory, ICommonObject, IMessage, INode, INodeData, INodeParams, MemoryMethods, MessageType } from '../../../src/Interface' -let mongoClientSingleton = null; +let mongoClientSingleton = null const getMongoClient = async (mongoDBConnectUrl) => { if (!mongoClientSingleton) { - mongoClientSingleton = new MongoClient(mongoDBConnectUrl, { useNewUrlParser: true, useUnifiedTopology: true }); - await mongoClientSingleton.connect(); + mongoClientSingleton = new MongoClient(mongoDBConnectUrl, { useNewUrlParser: true, useUnifiedTopology: true }) + await mongoClientSingleton.connect() } - return mongoClientSingleton; -}; + return mongoClientSingleton +} class MongoDB_Memory implements INode { label: string From 36ab1681ac455a5cea1482e98d9765eaf3042071 Mon Sep 17 00:00:00 2001 From: Ashes47 Date: Sun, 28 Jan 2024 21:39:22 +0530 Subject: [PATCH 3/6] lint --- .../memory/MongoDBMemory/MongoDBMemory.ts | 121 +++++++++--------- 1 file changed, 61 insertions(+), 60 deletions(-) diff --git a/packages/components/nodes/memory/MongoDBMemory/MongoDBMemory.ts b/packages/components/nodes/memory/MongoDBMemory/MongoDBMemory.ts index c2c7ada6..3c2903f3 100644 --- a/packages/components/nodes/memory/MongoDBMemory/MongoDBMemory.ts +++ b/packages/components/nodes/memory/MongoDBMemory/MongoDBMemory.ts @@ -28,20 +28,20 @@ class MongoDB_Memory implements INode { inputs: INodeParams[] constructor() { - this.label = 'MongoDB Atlas Chat Memory'; - this.name = 'MongoDBAtlasChatMemory'; - this.version = 1.0; - this.type = 'MongoDBAtlasChatMemory'; - this.icon = 'mongodb.svg'; - this.category = 'Memory'; - this.description = 'Stores the conversation in MongoDB Atlas'; - this.baseClasses = [this.type, ...getBaseClasses(BufferMemory)]; + this.label = 'MongoDB Atlas Chat Memory' + this.name = 'MongoDBAtlasChatMemory' + this.version = 1.0 + this.type = 'MongoDBAtlasChatMemory' + this.icon = 'mongodb.svg' + this.category = 'Memory' + this.description = 'Stores the conversation in MongoDB Atlas' + this.baseClasses = [this.type, ...getBaseClasses(BufferMemory)] this.credential = { label: 'Connect Credential', name: 'credential', type: 'credential', credentialNames: ['mongoDBUrlApi'] - }; + } this.inputs = [ { label: 'Database', @@ -59,7 +59,8 @@ class MongoDB_Memory implements INode { label: 'Session Id', name: 'sessionId', type: 'string', - description: 'If not specified, a random id will be used. Learn more', + description: + 'If not specified, a random id will be used. Learn more', default: '', additionalParams: true, optional: true @@ -71,126 +72,126 @@ class MongoDB_Memory implements INode { default: 'chat_history', additionalParams: true } - ]; + ] } async init(nodeData: INodeData, _: string, options: ICommonObject): Promise { - return initializeMongoDB(nodeData, options); + return initializeMongoDB(nodeData, options) } } const initializeMongoDB = async (nodeData: INodeData, options: ICommonObject): Promise => { - const databaseName = nodeData.inputs?.databaseName as string; - const collectionName = nodeData.inputs?.collectionName as string; - const memoryKey = nodeData.inputs?.memoryKey as string; - const sessionId = nodeData.inputs?.sessionId as string; + const databaseName = nodeData.inputs?.databaseName as string + const collectionName = nodeData.inputs?.collectionName as string + const memoryKey = nodeData.inputs?.memoryKey as string + const sessionId = nodeData.inputs?.sessionId as string - const credentialData = await getCredentialData(nodeData.credential ?? '', options); - const mongoDBConnectUrl = getCredentialParam('mongoDBConnectUrl', credentialData, nodeData); + const credentialData = await getCredentialData(nodeData.credential ?? '', options) + const mongoDBConnectUrl = getCredentialParam('mongoDBConnectUrl', credentialData, nodeData) - const client = await getMongoClient(mongoDBConnectUrl); - const collection = client.db(databaseName).collection(collectionName); + const client = await getMongoClient(mongoDBConnectUrl) + const collection = client.db(databaseName).collection(collectionName) const mongoDBChatMessageHistory = new MongoDBChatMessageHistory({ collection, sessionId - }); + }) mongoDBChatMessageHistory.getMessages = async (): Promise => { const document = await collection.findOne({ sessionId: (mongoDBChatMessageHistory as any).sessionId - }); - const messages = document?.messages || []; - return messages.map(mapStoredMessageToChatMessage); - }; + }) + const messages = document?.messages || [] + return messages.map(mapStoredMessageToChatMessage) + } mongoDBChatMessageHistory.addMessage = async (message: BaseMessage): Promise => { - const messages = [message].map((msg) => msg.toDict()); + const messages = [message].map((msg) => msg.toDict()) await collection.updateOne( { sessionId: (mongoDBChatMessageHistory as any).sessionId }, { $push: { messages: { $each: messages } } }, { upsert: true } - ); - }; + ) + } mongoDBChatMessageHistory.clear = async (): Promise => { - await collection.deleteOne({ sessionId: (mongoDBChatMessageHistory as any).sessionId }); - }; + await collection.deleteOne({ sessionId: (mongoDBChatMessageHistory as any).sessionId }) + } return new BufferMemoryExtended({ memoryKey: memoryKey ?? 'chat_history', chatHistory: mongoDBChatMessageHistory, sessionId, collection - }); -}; + }) +} interface BufferMemoryExtendedInput { - collection: Collection; - sessionId: string; + collection: Collection + sessionId: string } class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods { - sessionId = ''; - collection: Collection; + sessionId = '' + collection: Collection constructor(fields: BufferMemoryInput & BufferMemoryExtendedInput) { - super(fields); - this.sessionId = fields.sessionId; - this.collection = fields.collection; + super(fields) + this.sessionId = fields.sessionId + this.collection = fields.collection } async getChatMessages(overrideSessionId = '', returnBaseMessages = false): Promise { - if (!this.collection) return []; + if (!this.collection) return [] - const id = overrideSessionId ?? this.sessionId; - const document = await this.collection.findOne({ sessionId: id }); - const messages = document?.messages || []; - const baseMessages = messages.map(mapStoredMessageToChatMessage); - return returnBaseMessages ? baseMessages : convertBaseMessagetoIMessage(baseMessages); + const id = overrideSessionId ?? this.sessionId + const document = await this.collection.findOne({ sessionId: id }) + const messages = document?.messages || [] + const baseMessages = messages.map(mapStoredMessageToChatMessage) + return returnBaseMessages ? baseMessages : convertBaseMessagetoIMessage(baseMessages) } async addChatMessages(msgArray: { text: string; type: MessageType }[], overrideSessionId = ''): Promise { - if (!this.collection) return; + if (!this.collection) return - const id = overrideSessionId ?? this.sessionId; - const input = msgArray.find((msg) => msg.type === 'userMessage'); - const output = msgArray.find((msg) => msg.type === 'apiMessage'); + const id = overrideSessionId ?? this.sessionId + const input = msgArray.find((msg) => msg.type === 'userMessage') + const output = msgArray.find((msg) => msg.type === 'apiMessage') if (input) { - const newInputMessage = new HumanMessage(input.text); - const messageToAdd = [newInputMessage].map((msg) => msg.toDict()); + const newInputMessage = new HumanMessage(input.text) + const messageToAdd = [newInputMessage].map((msg) => msg.toDict()) await this.collection.updateOne( { sessionId: id }, { $push: { messages: { $each: messageToAdd } } }, { upsert: true } - ); + ) } if (output) { - const newOutputMessage = new AIMessage(output.text); - const messageToAdd = [newOutputMessage].map((msg) => msg.toDict()); + const newOutputMessage = new AIMessage(output.text) + const messageToAdd = [newOutputMessage].map((msg) => msg.toDict()) await this.collection.updateOne( { sessionId: id }, { $push: { messages: { $each: messageToAdd } } }, { upsert: true } - ); + ) } } async clearChatMessages(overrideSessionId = ''): Promise { - if (!this.collection) return; + if (!this.collection) return - const id = overrideSessionId ?? this.sessionId; - await this.collection.deleteOne({ sessionId: id }); - await this.clear(); + const id = overrideSessionId ?? this.sessionId + await this.collection.deleteOne({ sessionId: id }) + await this.clear() } } -module.exports = { nodeClass: MongoDB_Memory }; +module.exports = { nodeClass: MongoDB_Memory } From 51388d50577f11b6d6757d1bbeb29e5d1f01db2f Mon Sep 17 00:00:00 2001 From: Ashes47 Date: Sun, 28 Jan 2024 21:48:15 +0530 Subject: [PATCH 4/6] update --- .../nodes/memory/MongoDBMemory/MongoDBMemory.ts | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/packages/components/nodes/memory/MongoDBMemory/MongoDBMemory.ts b/packages/components/nodes/memory/MongoDBMemory/MongoDBMemory.ts index 3c2903f3..7399ad81 100644 --- a/packages/components/nodes/memory/MongoDBMemory/MongoDBMemory.ts +++ b/packages/components/nodes/memory/MongoDBMemory/MongoDBMemory.ts @@ -5,12 +5,15 @@ import { mapStoredMessageToChatMessage, AIMessage, HumanMessage, BaseMessage } f import { convertBaseMessagetoIMessage, getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils' import { FlowiseMemory, ICommonObject, IMessage, INode, INodeData, INodeParams, MemoryMethods, MessageType } from '../../../src/Interface' -let mongoClientSingleton = null +let mongoClientSingleton: MongoClient +let mongoUrl: string -const getMongoClient = async (mongoDBConnectUrl) => { - if (!mongoClientSingleton) { - mongoClientSingleton = new MongoClient(mongoDBConnectUrl, { useNewUrlParser: true, useUnifiedTopology: true }) +const getMongoClient = async (newMongoUrl: string) => { + if (!mongoClientSingleton || newMongoUrl !== mongoUrl) { + mongoClientSingleton = new MongoClient(newMongoUrl) + mongoUrl = newMongoUrl await mongoClientSingleton.connect() + return mongoClientSingleton } return mongoClientSingleton } From 1c108f35999a490c1d3727cdfb32736765d62587 Mon Sep 17 00:00:00 2001 From: Ashes47 Date: Sun, 28 Jan 2024 21:51:27 +0530 Subject: [PATCH 5/6] update --- .../nodes/memory/MongoDBMemory/MongoDBMemory.ts | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/packages/components/nodes/memory/MongoDBMemory/MongoDBMemory.ts b/packages/components/nodes/memory/MongoDBMemory/MongoDBMemory.ts index 7399ad81..b7309dcd 100644 --- a/packages/components/nodes/memory/MongoDBMemory/MongoDBMemory.ts +++ b/packages/components/nodes/memory/MongoDBMemory/MongoDBMemory.ts @@ -9,15 +9,20 @@ let mongoClientSingleton: MongoClient let mongoUrl: string const getMongoClient = async (newMongoUrl: string) => { - if (!mongoClientSingleton || newMongoUrl !== mongoUrl) { + if (!mongoClientSingleton) { + // if client doesn't exists + mongoClientSingleton = new MongoClient(newMongoUrl) + mongoUrl = newMongoUrl + return mongoClientSingleton + } else if (mongoClientSingleton && newMongoUrl !== mongoUrl) { + // if client exists but url changed + mongoClientSingleton.close() mongoClientSingleton = new MongoClient(newMongoUrl) mongoUrl = newMongoUrl - await mongoClientSingleton.connect() return mongoClientSingleton } return mongoClientSingleton } - class MongoDB_Memory implements INode { label: string name: string From 393f9b57c69e1405731f7a9014a6b790713fd772 Mon Sep 17 00:00:00 2001 From: Henry Date: Sun, 28 Jan 2024 17:18:18 +0000 Subject: [PATCH 6/6] use singleton redis connection --- .../nodes/cache/RedisCache/RedisCache.ts | 43 ++++++++++++- .../cache/RedisCache/RedisEmbeddingsCache.ts | 43 ++++++++++++- .../RedisBackedChatMemory.ts | 61 ++++++++++++------- .../nodes/vectorstores/Qdrant/Qdrant.ts | 7 --- .../nodes/vectorstores/Redis/Redis.ts | 31 ++++++++-- .../vectorstores/Redis/RedisSearchBase.ts | 28 +++++++-- .../vectorstores/Redis/Redis_Existing.ts | 1 - .../nodes/vectorstores/Redis/Redis_Upsert.ts | 1 - 8 files changed, 169 insertions(+), 46 deletions(-) diff --git a/packages/components/nodes/cache/RedisCache/RedisCache.ts b/packages/components/nodes/cache/RedisCache/RedisCache.ts index 4e61c239..c93adf58 100644 --- a/packages/components/nodes/cache/RedisCache/RedisCache.ts +++ b/packages/components/nodes/cache/RedisCache/RedisCache.ts @@ -1,9 +1,46 @@ import { getBaseClasses, getCredentialData, getCredentialParam, ICommonObject, INode, INodeData, INodeParams } from '../../../src' import { RedisCache as LangchainRedisCache } from 'langchain/cache/ioredis' -import { Redis } from 'ioredis' +import { Redis, RedisOptions } from 'ioredis' +import { isEqual } from 'lodash' import { Generation, ChatGeneration, StoredGeneration, mapStoredMessageToChatMessage } from 'langchain/schema' import hash from 'object-hash' +let redisClientSingleton: Redis +let redisClientOption: RedisOptions +let redisClientUrl: string + +const getRedisClientbyOption = (option: RedisOptions) => { + if (!redisClientSingleton) { + // if client doesn't exists + redisClientSingleton = new Redis(option) + redisClientOption = option + return redisClientSingleton + } else if (redisClientSingleton && !isEqual(option, redisClientOption)) { + // if client exists but option changed + redisClientSingleton.quit() + redisClientSingleton = new Redis(option) + redisClientOption = option + return redisClientSingleton + } + return redisClientSingleton +} + +const getRedisClientbyUrl = (url: string) => { + if (!redisClientSingleton) { + // if client doesn't exists + redisClientSingleton = new Redis(url) + redisClientUrl = url + return redisClientSingleton + } else if (redisClientSingleton && url !== redisClientUrl) { + // if client exists but option changed + redisClientSingleton.quit() + redisClientSingleton = new Redis(url) + redisClientUrl = url + return redisClientSingleton + } + return redisClientSingleton +} + class RedisCache implements INode { label: string name: string @@ -60,7 +97,7 @@ class RedisCache implements INode { const tlsOptions = sslEnabled === true ? { tls: { rejectUnauthorized: false } } : {} - client = new Redis({ + client = getRedisClientbyOption({ port: portStr ? parseInt(portStr) : 6379, host, username, @@ -68,7 +105,7 @@ class RedisCache implements INode { ...tlsOptions }) } else { - client = new Redis(redisUrl) + client = getRedisClientbyUrl(redisUrl) } const redisClient = new LangchainRedisCache(client) diff --git a/packages/components/nodes/cache/RedisCache/RedisEmbeddingsCache.ts b/packages/components/nodes/cache/RedisCache/RedisEmbeddingsCache.ts index fe1b4df8..b74413fe 100644 --- a/packages/components/nodes/cache/RedisCache/RedisEmbeddingsCache.ts +++ b/packages/components/nodes/cache/RedisCache/RedisEmbeddingsCache.ts @@ -1,9 +1,46 @@ import { getBaseClasses, getCredentialData, getCredentialParam, ICommonObject, INode, INodeData, INodeParams } from '../../../src' -import { Redis } from 'ioredis' +import { Redis, RedisOptions } from 'ioredis' +import { isEqual } from 'lodash' import { CacheBackedEmbeddings } from 'langchain/embeddings/cache_backed' import { RedisByteStore } from 'langchain/storage/ioredis' import { Embeddings } from 'langchain/embeddings/base' +let redisClientSingleton: Redis +let redisClientOption: RedisOptions +let redisClientUrl: string + +const getRedisClientbyOption = (option: RedisOptions) => { + if (!redisClientSingleton) { + // if client doesn't exists + redisClientSingleton = new Redis(option) + redisClientOption = option + return redisClientSingleton + } else if (redisClientSingleton && !isEqual(option, redisClientOption)) { + // if client exists but option changed + redisClientSingleton.quit() + redisClientSingleton = new Redis(option) + redisClientOption = option + return redisClientSingleton + } + return redisClientSingleton +} + +const getRedisClientbyUrl = (url: string) => { + if (!redisClientSingleton) { + // if client doesn't exists + redisClientSingleton = new Redis(url) + redisClientUrl = url + return redisClientSingleton + } else if (redisClientSingleton && url !== redisClientUrl) { + // if client exists but option changed + redisClientSingleton.quit() + redisClientSingleton = new Redis(url) + redisClientUrl = url + return redisClientSingleton + } + return redisClientSingleton +} + class RedisEmbeddingsCache implements INode { label: string name: string @@ -75,7 +112,7 @@ class RedisEmbeddingsCache implements INode { const tlsOptions = sslEnabled === true ? { tls: { rejectUnauthorized: false } } : {} - client = new Redis({ + client = getRedisClientbyOption({ port: portStr ? parseInt(portStr) : 6379, host, username, @@ -83,7 +120,7 @@ class RedisEmbeddingsCache implements INode { ...tlsOptions }) } else { - client = new Redis(redisUrl) + client = getRedisClientbyUrl(redisUrl) } ttl ??= '3600' diff --git a/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts b/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts index baf4ea6b..c54e07b5 100644 --- a/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts +++ b/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts @@ -1,10 +1,47 @@ -import { Redis } from 'ioredis' +import { Redis, RedisOptions } from 'ioredis' +import { isEqual } from 'lodash' import { BufferMemory, BufferMemoryInput } from 'langchain/memory' import { RedisChatMessageHistory, RedisChatMessageHistoryInput } from 'langchain/stores/message/ioredis' import { mapStoredMessageToChatMessage, BaseMessage, AIMessage, HumanMessage } from 'langchain/schema' import { INode, INodeData, INodeParams, ICommonObject, MessageType, IMessage, MemoryMethods, FlowiseMemory } from '../../../src/Interface' import { convertBaseMessagetoIMessage, getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils' +let redisClientSingleton: Redis +let redisClientOption: RedisOptions +let redisClientUrl: string + +const getRedisClientbyOption = (option: RedisOptions) => { + if (!redisClientSingleton) { + // if client doesn't exists + redisClientSingleton = new Redis(option) + redisClientOption = option + return redisClientSingleton + } else if (redisClientSingleton && !isEqual(option, redisClientOption)) { + // if client exists but option changed + redisClientSingleton.quit() + redisClientSingleton = new Redis(option) + redisClientOption = option + return redisClientSingleton + } + return redisClientSingleton +} + +const getRedisClientbyUrl = (url: string) => { + if (!redisClientSingleton) { + // if client doesn't exists + redisClientSingleton = new Redis(url) + redisClientUrl = url + return redisClientSingleton + } else if (redisClientSingleton && url !== redisClientUrl) { + // if client exists but option changed + redisClientSingleton.quit() + redisClientSingleton = new Redis(url) + redisClientUrl = url + return redisClientSingleton + } + return redisClientSingleton +} + class RedisBackedChatMemory_Memory implements INode { label: string name: string @@ -95,7 +132,7 @@ const initalizeRedis = async (nodeData: INodeData, options: ICommonObject): Prom const tlsOptions = sslEnabled === true ? { tls: { rejectUnauthorized: false } } : {} - client = new Redis({ + client = getRedisClientbyOption({ port: portStr ? parseInt(portStr) : 6379, host, username, @@ -103,7 +140,7 @@ const initalizeRedis = async (nodeData: INodeData, options: ICommonObject): Prom ...tlsOptions }) } else { - client = new Redis(redisUrl) + client = getRedisClientbyUrl(redisUrl) } let obj: RedisChatMessageHistoryInput = { @@ -120,24 +157,6 @@ const initalizeRedis = async (nodeData: INodeData, options: ICommonObject): Prom const redisChatMessageHistory = new RedisChatMessageHistory(obj) - /*redisChatMessageHistory.getMessages = async (): Promise => { - const rawStoredMessages = await client.lrange((redisChatMessageHistory as any).sessionId, windowSize ? -windowSize : 0, -1) - const orderedMessages = rawStoredMessages.reverse().map((message) => JSON.parse(message)) - return orderedMessages.map(mapStoredMessageToChatMessage) - } - - redisChatMessageHistory.addMessage = async (message: BaseMessage): Promise => { - const messageToAdd = [message].map((msg) => msg.toDict()) - await client.lpush((redisChatMessageHistory as any).sessionId, JSON.stringify(messageToAdd[0])) - if (sessionTTL) { - await client.expire((redisChatMessageHistory as any).sessionId, sessionTTL) - } - } - - redisChatMessageHistory.clear = async (): Promise => { - await client.del((redisChatMessageHistory as any).sessionId) - }*/ - const memory = new BufferMemoryExtended({ memoryKey: memoryKey ?? 'chat_history', chatHistory: redisChatMessageHistory, diff --git a/packages/components/nodes/vectorstores/Qdrant/Qdrant.ts b/packages/components/nodes/vectorstores/Qdrant/Qdrant.ts index 1d5f7788..80899942 100644 --- a/packages/components/nodes/vectorstores/Qdrant/Qdrant.ts +++ b/packages/components/nodes/vectorstores/Qdrant/Qdrant.ts @@ -1,6 +1,5 @@ import { flatten } from 'lodash' import { QdrantClient } from '@qdrant/js-client-rest' -import type { Schemas as QdrantSchemas } from '@qdrant/js-client-rest' import { VectorStoreRetrieverInput } from 'langchain/vectorstores/base' import { Document } from 'langchain/document' import { QdrantVectorStore, QdrantLibArgs } from 'langchain/vectorstores/qdrant' @@ -9,12 +8,6 @@ import { ICommonObject, INode, INodeData, INodeOutputsValue, INodeParams } from import { getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils' type RetrieverConfig = Partial> -type QdrantSearchResponse = QdrantSchemas['ScoredPoint'] & { - payload: { - metadata: object - content: string - } -} class Qdrant_VectorStores implements INode { label: string diff --git a/packages/components/nodes/vectorstores/Redis/Redis.ts b/packages/components/nodes/vectorstores/Redis/Redis.ts index 49f9e8ff..0dddf782 100644 --- a/packages/components/nodes/vectorstores/Redis/Redis.ts +++ b/packages/components/nodes/vectorstores/Redis/Redis.ts @@ -1,5 +1,5 @@ -import { flatten } from 'lodash' -import { createClient, SearchOptions } from 'redis' +import { flatten, isEqual } from 'lodash' +import { createClient, SearchOptions, RedisClientOptions } from 'redis' import { Embeddings } from 'langchain/embeddings/base' import { RedisVectorStore, RedisVectorStoreConfig } from 'langchain/vectorstores/redis' import { Document } from 'langchain/document' @@ -7,6 +7,27 @@ import { ICommonObject, INode, INodeData, INodeOutputsValue, INodeParams } from import { getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils' import { escapeAllStrings, escapeSpecialChars, unEscapeSpecialChars } from './utils' +let redisClientSingleton: ReturnType +let redisClientOption: RedisClientOptions + +const getRedisClient = async (option: RedisClientOptions) => { + if (!redisClientSingleton) { + // if client doesn't exists + redisClientSingleton = createClient(option) + await redisClientSingleton.connect() + redisClientOption = option + return redisClientSingleton + } else if (redisClientSingleton && !isEqual(option, redisClientOption)) { + // if client exists but option changed + redisClientSingleton.quit() + redisClientSingleton = createClient(option) + await redisClientSingleton.connect() + redisClientOption = option + return redisClientSingleton + } + return redisClientSingleton +} + class Redis_VectorStores implements INode { label: string name: string @@ -149,8 +170,7 @@ class Redis_VectorStores implements INode { } try { - const redisClient = createClient({ url: redisUrl }) - await redisClient.connect() + const redisClient = await getRedisClient({ url: redisUrl }) const storeConfig: RedisVectorStoreConfig = { redisClient: redisClient, @@ -210,8 +230,7 @@ class Redis_VectorStores implements INode { redisUrl = 'redis://' + username + ':' + password + '@' + host + ':' + portStr } - const redisClient = createClient({ url: redisUrl }) - await redisClient.connect() + const redisClient = await getRedisClient({ url: redisUrl }) const storeConfig: RedisVectorStoreConfig = { redisClient: redisClient, diff --git a/packages/components/nodes/vectorstores/Redis/RedisSearchBase.ts b/packages/components/nodes/vectorstores/Redis/RedisSearchBase.ts index b6aa6ebb..e87b49f9 100644 --- a/packages/components/nodes/vectorstores/Redis/RedisSearchBase.ts +++ b/packages/components/nodes/vectorstores/Redis/RedisSearchBase.ts @@ -7,13 +7,34 @@ import { INodeOutputsValue, INodeParams } from '../../../src' - import { Embeddings } from 'langchain/embeddings/base' import { VectorStore } from 'langchain/vectorstores/base' import { Document } from 'langchain/document' -import { createClient, SearchOptions } from 'redis' +import { createClient, SearchOptions, RedisClientOptions } from 'redis' import { RedisVectorStore } from 'langchain/vectorstores/redis' import { escapeSpecialChars, unEscapeSpecialChars } from './utils' +import { isEqual } from 'lodash' + +let redisClientSingleton: ReturnType +let redisClientOption: RedisClientOptions + +const getRedisClient = async (option: RedisClientOptions) => { + if (!redisClientSingleton) { + // if client doesn't exists + redisClientSingleton = createClient(option) + await redisClientSingleton.connect() + redisClientOption = option + return redisClientSingleton + } else if (redisClientSingleton && !isEqual(option, redisClientOption)) { + // if client exists but option changed + redisClientSingleton.quit() + redisClientSingleton = createClient(option) + await redisClientSingleton.connect() + redisClientOption = option + return redisClientSingleton + } + return redisClientSingleton +} export abstract class RedisSearchBase { label: string @@ -141,8 +162,7 @@ export abstract class RedisSearchBase { redisUrl = 'redis://' + username + ':' + password + '@' + host + ':' + portStr } - this.redisClient = createClient({ url: redisUrl }) - await this.redisClient.connect() + this.redisClient = await getRedisClient({ url: redisUrl }) const vectorStore = await this.constructVectorStore(embeddings, indexName, replaceIndex, docs) if (!contentKey || contentKey === '') contentKey = 'content' diff --git a/packages/components/nodes/vectorstores/Redis/Redis_Existing.ts b/packages/components/nodes/vectorstores/Redis/Redis_Existing.ts index e8848d33..6f2ec9b3 100644 --- a/packages/components/nodes/vectorstores/Redis/Redis_Existing.ts +++ b/packages/components/nodes/vectorstores/Redis/Redis_Existing.ts @@ -3,7 +3,6 @@ import { Embeddings } from 'langchain/embeddings/base' import { VectorStore } from 'langchain/vectorstores/base' import { RedisVectorStore, RedisVectorStoreConfig } from 'langchain/vectorstores/redis' import { Document } from 'langchain/document' - import { RedisSearchBase } from './RedisSearchBase' class RedisExisting_VectorStores extends RedisSearchBase implements INode { diff --git a/packages/components/nodes/vectorstores/Redis/Redis_Upsert.ts b/packages/components/nodes/vectorstores/Redis/Redis_Upsert.ts index 4da58eaf..c4394824 100644 --- a/packages/components/nodes/vectorstores/Redis/Redis_Upsert.ts +++ b/packages/components/nodes/vectorstores/Redis/Redis_Upsert.ts @@ -1,7 +1,6 @@ import { ICommonObject, INode, INodeData } from '../../../src/Interface' import { Embeddings } from 'langchain/embeddings/base' import { Document } from 'langchain/document' - import { flatten } from 'lodash' import { RedisSearchBase } from './RedisSearchBase' import { VectorStore } from 'langchain/vectorstores/base'