From e8deeb25cff10b8d17393de3632b6febcee8c336 Mon Sep 17 00:00:00 2001 From: Anuj Verma <42962743+Ashes47@users.noreply.github.com> Date: Sun, 28 Jan 2024 19:45:19 +0530 Subject: [PATCH] Update MongoDBMemory.ts --- .../memory/MongoDBMemory/MongoDBMemory.ts | 145 +++++++++--------- 1 file changed, 76 insertions(+), 69 deletions(-) diff --git a/packages/components/nodes/memory/MongoDBMemory/MongoDBMemory.ts b/packages/components/nodes/memory/MongoDBMemory/MongoDBMemory.ts index c593c20d..b35de5ae 100644 --- a/packages/components/nodes/memory/MongoDBMemory/MongoDBMemory.ts +++ b/packages/components/nodes/memory/MongoDBMemory/MongoDBMemory.ts @@ -1,9 +1,19 @@ -import { MongoClient, Collection, Document } from 'mongodb' -import { MongoDBChatMessageHistory } from 'langchain/stores/message/mongodb' -import { BufferMemory, BufferMemoryInput } from 'langchain/memory' -import { mapStoredMessageToChatMessage, AIMessage, HumanMessage, BaseMessage } from 'langchain/schema' -import { convertBaseMessagetoIMessage, getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils' -import { FlowiseMemory, ICommonObject, IMessage, INode, INodeData, INodeParams, MemoryMethods, MessageType } from '../../../src/Interface' +import { MongoClient, Collection, Document } from 'mongodb'; +import { MongoDBChatMessageHistory } from 'langchain/stores/message/mongodb'; +import { BufferMemory, BufferMemoryInput } from 'langchain/memory'; +import { mapStoredMessageToChatMessage, AIMessage, HumanMessage, BaseMessage } from 'langchain/schema'; +import { convertBaseMessagetoIMessage, getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils'; +import { FlowiseMemory, ICommonObject, IMessage, INode, INodeData, INodeParams, MemoryMethods, MessageType } from '../../../src/Interface'; + +let mongoClientSingleton = null; + +const getMongoClient = async (mongoDBConnectUrl) => { + if (!mongoClientSingleton) { + mongoClientSingleton = new MongoClient(mongoDBConnectUrl, { useNewUrlParser: true, useUnifiedTopology: true }); + await mongoClientSingleton.connect(); + } + return mongoClientSingleton; +}; class MongoDB_Memory implements INode { label: string @@ -18,20 +28,20 @@ class MongoDB_Memory implements INode { inputs: INodeParams[] constructor() { - this.label = 'MongoDB Atlas Chat Memory' - this.name = 'MongoDBAtlasChatMemory' - this.version = 1.0 - this.type = 'MongoDBAtlasChatMemory' - this.icon = 'mongodb.svg' - this.category = 'Memory' - this.description = 'Stores the conversation in MongoDB Atlas' - this.baseClasses = [this.type, ...getBaseClasses(BufferMemory)] + this.label = 'MongoDB Atlas Chat Memory'; + this.name = 'MongoDBAtlasChatMemory'; + this.version = 1.0; + this.type = 'MongoDBAtlasChatMemory'; + this.icon = 'mongodb.svg'; + this.category = 'Memory'; + this.description = 'Stores the conversation in MongoDB Atlas'; + this.baseClasses = [this.type, ...getBaseClasses(BufferMemory)]; this.credential = { label: 'Connect Credential', name: 'credential', type: 'credential', credentialNames: ['mongoDBUrlApi'] - } + }; this.inputs = [ { label: 'Database', @@ -49,8 +59,7 @@ class MongoDB_Memory implements INode { label: 'Session Id', name: 'sessionId', type: 'string', - description: - 'If not specified, a random id will be used. Learn more', + description: 'If not specified, a random id will be used. Learn more', default: '', additionalParams: true, optional: true @@ -62,128 +71,126 @@ class MongoDB_Memory implements INode { default: 'chat_history', additionalParams: true } - ] + ]; } async init(nodeData: INodeData, _: string, options: ICommonObject): Promise { - return initializeMongoDB(nodeData, options) + return initializeMongoDB(nodeData, options); } } const initializeMongoDB = async (nodeData: INodeData, options: ICommonObject): Promise => { - const databaseName = nodeData.inputs?.databaseName as string - const collectionName = nodeData.inputs?.collectionName as string - const memoryKey = nodeData.inputs?.memoryKey as string - const sessionId = nodeData.inputs?.sessionId as string + const databaseName = nodeData.inputs?.databaseName as string; + const collectionName = nodeData.inputs?.collectionName as string; + const memoryKey = nodeData.inputs?.memoryKey as string; + const sessionId = nodeData.inputs?.sessionId as string; - const credentialData = await getCredentialData(nodeData.credential ?? '', options) - const mongoDBConnectUrl = getCredentialParam('mongoDBConnectUrl', credentialData, nodeData) + const credentialData = await getCredentialData(nodeData.credential ?? '', options); + const mongoDBConnectUrl = getCredentialParam('mongoDBConnectUrl', credentialData, nodeData); - const client = new MongoClient(mongoDBConnectUrl) - await client.connect() - - const collection = client.db(databaseName).collection(collectionName) + const client = await getMongoClient(mongoDBConnectUrl); + const collection = client.db(databaseName).collection(collectionName); const mongoDBChatMessageHistory = new MongoDBChatMessageHistory({ collection, sessionId - }) + }); mongoDBChatMessageHistory.getMessages = async (): Promise => { const document = await collection.findOne({ sessionId: (mongoDBChatMessageHistory as any).sessionId - }) - const messages = document?.messages || [] - return messages.map(mapStoredMessageToChatMessage) - } + }); + const messages = document?.messages || []; + return messages.map(mapStoredMessageToChatMessage); + }; mongoDBChatMessageHistory.addMessage = async (message: BaseMessage): Promise => { - const messages = [message].map((msg) => msg.toDict()) + const messages = [message].map((msg) => msg.toDict()); await collection.updateOne( { sessionId: (mongoDBChatMessageHistory as any).sessionId }, { $push: { messages: { $each: messages } } }, { upsert: true } - ) - } + ); + }; mongoDBChatMessageHistory.clear = async (): Promise => { - await collection.deleteOne({ sessionId: (mongoDBChatMessageHistory as any).sessionId }) - } + await collection.deleteOne({ sessionId: (mongoDBChatMessageHistory as any).sessionId }); + }; return new BufferMemoryExtended({ memoryKey: memoryKey ?? 'chat_history', chatHistory: mongoDBChatMessageHistory, sessionId, collection - }) -} + }); +}; interface BufferMemoryExtendedInput { - collection: Collection - sessionId: string + collection: Collection; + sessionId: string; } class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods { - sessionId = '' - collection: Collection + sessionId = ''; + collection: Collection; constructor(fields: BufferMemoryInput & BufferMemoryExtendedInput) { - super(fields) - this.sessionId = fields.sessionId - this.collection = fields.collection + super(fields); + this.sessionId = fields.sessionId; + this.collection = fields.collection; } async getChatMessages(overrideSessionId = '', returnBaseMessages = false): Promise { - if (!this.collection) return [] + if (!this.collection) return []; - const id = overrideSessionId ?? this.sessionId - const document = await this.collection.findOne({ sessionId: id }) - const messages = document?.messages || [] - const baseMessages = messages.map(mapStoredMessageToChatMessage) - return returnBaseMessages ? baseMessages : convertBaseMessagetoIMessage(baseMessages) + const id = overrideSessionId ?? this.sessionId; + const document = await this.collection.findOne({ sessionId: id }); + const messages = document?.messages || []; + const baseMessages = messages.map(mapStoredMessageToChatMessage); + return returnBaseMessages ? baseMessages : convertBaseMessagetoIMessage(baseMessages); } async addChatMessages(msgArray: { text: string; type: MessageType }[], overrideSessionId = ''): Promise { - if (!this.collection) return + if (!this.collection) return; - const id = overrideSessionId ?? this.sessionId - const input = msgArray.find((msg) => msg.type === 'userMessage') - const output = msgArray.find((msg) => msg.type === 'apiMessage') + const id = overrideSessionId ?? this.sessionId; + const input = msgArray.find((msg) => msg.type === 'userMessage'); + const output = msgArray.find((msg) => msg.type === 'apiMessage'); if (input) { - const newInputMessage = new HumanMessage(input.text) - const messageToAdd = [newInputMessage].map((msg) => msg.toDict()) + const newInputMessage = new HumanMessage(input.text); + const messageToAdd = [newInputMessage].map((msg) => msg.toDict()); await this.collection.updateOne( { sessionId: id }, { $push: { messages: { $each: messageToAdd } } }, { upsert: true } - ) + ); } if (output) { - const newOutputMessage = new AIMessage(output.text) - const messageToAdd = [newOutputMessage].map((msg) => msg.toDict()) + const newOutputMessage = new AIMessage(output.text); + const messageToAdd = [newOutputMessage].map((msg) => msg.toDict()); await this.collection.updateOne( { sessionId: id }, { $push: { messages: { $each: messageToAdd } } }, { upsert: true } - ) + ); } } async clearChatMessages(overrideSessionId = ''): Promise { - if (!this.collection) return + if (!this.collection) return; - const id = overrideSessionId ?? this.sessionId - await this.collection.deleteOne({ sessionId: id }) - await this.clear() + const id = overrideSessionId ?? this.sessionId; + await this.collection.deleteOne({ sessionId: id }); + await this.clear(); } } -module.exports = { nodeClass: MongoDB_Memory } +module.exports = { nodeClass: MongoDB_Memory };