mirror of
https://github.com/farcasclaudiu/Flowise.git
synced 2026-06-28 23:01:09 +03:00
add redis backed chat message fix
This commit is contained in:
@@ -0,0 +1,43 @@
|
|||||||
|
import { INodeParams, INodeCredential } from '../src/Interface'
|
||||||
|
|
||||||
|
class RedisApi implements INodeCredential {
|
||||||
|
label: string
|
||||||
|
name: string
|
||||||
|
version: number
|
||||||
|
description: string
|
||||||
|
inputs: INodeParams[]
|
||||||
|
|
||||||
|
constructor() {
|
||||||
|
this.label = 'Redis API'
|
||||||
|
this.name = 'redisApi'
|
||||||
|
this.version = 1.0
|
||||||
|
this.inputs = [
|
||||||
|
{
|
||||||
|
label: 'Redis Host',
|
||||||
|
name: 'redisHost',
|
||||||
|
type: 'string',
|
||||||
|
default: '127.0.0.1'
|
||||||
|
},
|
||||||
|
{
|
||||||
|
label: 'Port',
|
||||||
|
name: 'redisPort',
|
||||||
|
type: 'number',
|
||||||
|
default: '6789'
|
||||||
|
},
|
||||||
|
{
|
||||||
|
label: 'User',
|
||||||
|
name: 'redisUser',
|
||||||
|
type: 'string',
|
||||||
|
placeholder: '<REDIS_USERNAME>'
|
||||||
|
},
|
||||||
|
{
|
||||||
|
label: 'Password',
|
||||||
|
name: 'redisPwd',
|
||||||
|
type: 'password',
|
||||||
|
placeholder: '<REDIS_PASSWORD>'
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = { credClass: RedisApi }
|
||||||
@@ -1,9 +1,9 @@
|
|||||||
import { INode, INodeData, INodeParams } from '../../../src/Interface'
|
import { INode, INodeData, INodeParams, ICommonObject } from '../../../src/Interface'
|
||||||
import { getBaseClasses } from '../../../src/utils'
|
import { getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils'
|
||||||
import { ICommonObject } from '../../../src'
|
|
||||||
import { BufferMemory, BufferMemoryInput } from 'langchain/memory'
|
import { BufferMemory, BufferMemoryInput } from 'langchain/memory'
|
||||||
import { RedisChatMessageHistory, RedisChatMessageHistoryInput } from 'langchain/stores/message/redis'
|
import { RedisChatMessageHistory, RedisChatMessageHistoryInput } from 'langchain/stores/message/ioredis'
|
||||||
import { createClient } from 'redis'
|
import { mapStoredMessageToChatMessage, BaseMessage } from 'langchain/schema'
|
||||||
|
import { Redis } from 'ioredis'
|
||||||
|
|
||||||
class RedisBackedChatMemory_Memory implements INode {
|
class RedisBackedChatMemory_Memory implements INode {
|
||||||
label: string
|
label: string
|
||||||
@@ -15,6 +15,7 @@ class RedisBackedChatMemory_Memory implements INode {
|
|||||||
category: string
|
category: string
|
||||||
baseClasses: string[]
|
baseClasses: string[]
|
||||||
inputs: INodeParams[]
|
inputs: INodeParams[]
|
||||||
|
credential: INodeParams
|
||||||
|
|
||||||
constructor() {
|
constructor() {
|
||||||
this.label = 'Redis-Backed Chat Memory'
|
this.label = 'Redis-Backed Chat Memory'
|
||||||
@@ -25,13 +26,14 @@ class RedisBackedChatMemory_Memory implements INode {
|
|||||||
this.category = 'Memory'
|
this.category = 'Memory'
|
||||||
this.description = 'Summarizes the conversation and stores the memory in Redis server'
|
this.description = 'Summarizes the conversation and stores the memory in Redis server'
|
||||||
this.baseClasses = [this.type, ...getBaseClasses(BufferMemory)]
|
this.baseClasses = [this.type, ...getBaseClasses(BufferMemory)]
|
||||||
|
this.credential = {
|
||||||
|
label: 'Connect Credential',
|
||||||
|
name: 'credential',
|
||||||
|
type: 'credential',
|
||||||
|
optional: true,
|
||||||
|
credentialNames: ['redisApi']
|
||||||
|
}
|
||||||
this.inputs = [
|
this.inputs = [
|
||||||
{
|
|
||||||
label: 'Base URL',
|
|
||||||
name: 'baseURL',
|
|
||||||
type: 'string',
|
|
||||||
default: 'redis://localhost:6379'
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
label: 'Session Id',
|
label: 'Session Id',
|
||||||
name: 'sessionId',
|
name: 'sessionId',
|
||||||
@@ -60,11 +62,11 @@ class RedisBackedChatMemory_Memory implements INode {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async init(nodeData: INodeData, _: string, options: ICommonObject): Promise<any> {
|
async init(nodeData: INodeData, _: string, options: ICommonObject): Promise<any> {
|
||||||
return initalizeRedis(nodeData, options)
|
return await initalizeRedis(nodeData, options)
|
||||||
}
|
}
|
||||||
|
|
||||||
async clearSessionMemory(nodeData: INodeData, options: ICommonObject): Promise<void> {
|
async clearSessionMemory(nodeData: INodeData, options: ICommonObject): Promise<void> {
|
||||||
const redis = initalizeRedis(nodeData, options)
|
const redis = await initalizeRedis(nodeData, options)
|
||||||
const sessionId = nodeData.inputs?.sessionId as string
|
const sessionId = nodeData.inputs?.sessionId as string
|
||||||
const chatId = options?.chatId as string
|
const chatId = options?.chatId as string
|
||||||
options.logger.info(`Clearing Redis memory session ${sessionId ? sessionId : chatId}`)
|
options.logger.info(`Clearing Redis memory session ${sessionId ? sessionId : chatId}`)
|
||||||
@@ -73,17 +75,28 @@ class RedisBackedChatMemory_Memory implements INode {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const initalizeRedis = (nodeData: INodeData, options: ICommonObject): BufferMemory => {
|
const initalizeRedis = async (nodeData: INodeData, options: ICommonObject): Promise<BufferMemory> => {
|
||||||
const baseURL = nodeData.inputs?.baseURL as string
|
|
||||||
const sessionId = nodeData.inputs?.sessionId as string
|
const sessionId = nodeData.inputs?.sessionId as string
|
||||||
const sessionTTL = nodeData.inputs?.sessionTTL as number
|
const sessionTTL = nodeData.inputs?.sessionTTL as number
|
||||||
const memoryKey = nodeData.inputs?.memoryKey as string
|
const memoryKey = nodeData.inputs?.memoryKey as string
|
||||||
const chatId = options?.chatId as string
|
const chatId = options?.chatId as string
|
||||||
|
|
||||||
|
const credentialData = await getCredentialData(nodeData.credential ?? '', options)
|
||||||
|
const username = getCredentialParam('redisUser', credentialData, nodeData)
|
||||||
|
const password = getCredentialParam('redisPwd', credentialData, nodeData)
|
||||||
|
const portStr = getCredentialParam('redisPort', credentialData, nodeData)
|
||||||
|
const host = getCredentialParam('redisHost', credentialData, nodeData)
|
||||||
|
|
||||||
let isSessionIdUsingChatMessageId = false
|
let isSessionIdUsingChatMessageId = false
|
||||||
if (!sessionId && chatId) isSessionIdUsingChatMessageId = true
|
if (!sessionId && chatId) isSessionIdUsingChatMessageId = true
|
||||||
|
|
||||||
const redisClient = createClient({ url: baseURL })
|
const redisClient = new Redis({
|
||||||
|
port: portStr ? parseInt(portStr) : 6379,
|
||||||
|
host,
|
||||||
|
username,
|
||||||
|
password
|
||||||
|
})
|
||||||
|
|
||||||
let obj: RedisChatMessageHistoryInput = {
|
let obj: RedisChatMessageHistoryInput = {
|
||||||
sessionId: sessionId ? sessionId : chatId,
|
sessionId: sessionId ? sessionId : chatId,
|
||||||
client: redisClient
|
client: redisClient
|
||||||
@@ -98,6 +111,24 @@ const initalizeRedis = (nodeData: INodeData, options: ICommonObject): BufferMemo
|
|||||||
|
|
||||||
const redisChatMessageHistory = new RedisChatMessageHistory(obj)
|
const redisChatMessageHistory = new RedisChatMessageHistory(obj)
|
||||||
|
|
||||||
|
redisChatMessageHistory.getMessages = async (): Promise<BaseMessage[]> => {
|
||||||
|
const rawStoredMessages = await redisClient.lrange(sessionId ? sessionId : chatId, 0, -1)
|
||||||
|
const orderedMessages = rawStoredMessages.reverse().map((message) => JSON.parse(message))
|
||||||
|
return orderedMessages.map(mapStoredMessageToChatMessage)
|
||||||
|
}
|
||||||
|
|
||||||
|
redisChatMessageHistory.addMessage = async (message: BaseMessage): Promise<void> => {
|
||||||
|
const messageToAdd = [message].map((msg) => msg.toDict())
|
||||||
|
await redisClient.lpush(sessionId ? sessionId : chatId, JSON.stringify(messageToAdd[0]))
|
||||||
|
if (sessionTTL) {
|
||||||
|
await redisClient.expire(sessionId ? sessionId : chatId, sessionTTL)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
redisChatMessageHistory.clear = async (): Promise<void> => {
|
||||||
|
await redisClient.del(sessionId ? sessionId : chatId)
|
||||||
|
}
|
||||||
|
|
||||||
const memory = new BufferMemoryExtended({
|
const memory = new BufferMemoryExtended({
|
||||||
memoryKey,
|
memoryKey,
|
||||||
chatHistory: redisChatMessageHistory,
|
chatHistory: redisChatMessageHistory,
|
||||||
|
|||||||
Reference in New Issue
Block a user