mirror of
https://github.com/farcasclaudiu/Flowise.git
synced 2026-06-28 21:00:58 +03:00
Merge pull request #1422 from FlowiseAI/feature/env-vars
Feature/Env Variables
This commit is contained in:
@@ -265,7 +265,7 @@ class OpenAIAssistant_Agents implements INode {
|
||||
// Start tool analytics
|
||||
const toolIds = await analyticHandlers.onToolStart(tool.name, actions[i].toolInput, parentIds)
|
||||
|
||||
const toolOutput = await tool.call(actions[i].toolInput)
|
||||
const toolOutput = await tool.call(actions[i].toolInput, undefined, undefined, threadId)
|
||||
|
||||
// End tool analytics
|
||||
await analyticHandlers.onToolEnd(toolIds, toolOutput)
|
||||
|
||||
@@ -1,10 +1,17 @@
|
||||
import { ICommonObject, INode, INodeData, INodeParams } from '../../../src/Interface'
|
||||
import { initializeAgentExecutorWithOptions, AgentExecutor } from 'langchain/agents'
|
||||
import { getBaseClasses, mapChatHistory } from '../../../src/utils'
|
||||
import { BaseLanguageModel } from 'langchain/base_language'
|
||||
import { FlowiseMemory, ICommonObject, INode, INodeData, INodeParams } from '../../../src/Interface'
|
||||
import { AgentExecutor as LCAgentExecutor, AgentExecutorInput } from 'langchain/agents'
|
||||
import { ChainValues, AgentStep, AgentFinish, AgentAction, BaseMessage, FunctionMessage, AIMessage } from 'langchain/schema'
|
||||
import { OutputParserException } from 'langchain/schema/output_parser'
|
||||
import { CallbackManagerForChainRun } from 'langchain/callbacks'
|
||||
import { formatToOpenAIFunction } from 'langchain/tools'
|
||||
import { ToolInputParsingException, Tool } from '@langchain/core/tools'
|
||||
import { getBaseClasses } from '../../../src/utils'
|
||||
import { flatten } from 'lodash'
|
||||
import { BaseChatMemory } from 'langchain/memory'
|
||||
import { RunnableSequence } from 'langchain/schema/runnable'
|
||||
import { ConsoleCallbackHandler, CustomChainHandler, additionalCallbacks } from '../../../src/handler'
|
||||
import { ChatPromptTemplate, MessagesPlaceholder } from 'langchain/prompts'
|
||||
import { ChatOpenAI } from 'langchain/chat_models/openai'
|
||||
import { OpenAIFunctionsAgentOutputParser } from 'langchain/agents/openai/output_parser'
|
||||
|
||||
class OpenAIFunctionAgent_Agents implements INode {
|
||||
label: string
|
||||
@@ -16,8 +23,9 @@ class OpenAIFunctionAgent_Agents implements INode {
|
||||
category: string
|
||||
baseClasses: string[]
|
||||
inputs: INodeParams[]
|
||||
sessionId?: string
|
||||
|
||||
constructor() {
|
||||
constructor(fields: { sessionId?: string }) {
|
||||
this.label = 'OpenAI Function Agent'
|
||||
this.name = 'openAIFunctionAgent'
|
||||
this.version = 3.0
|
||||
@@ -25,7 +33,7 @@ class OpenAIFunctionAgent_Agents implements INode {
|
||||
this.category = 'Agents'
|
||||
this.icon = 'function.svg'
|
||||
this.description = `An agent that uses Function Calling to pick the tool and args to call`
|
||||
this.baseClasses = [this.type, ...getBaseClasses(AgentExecutor)]
|
||||
this.baseClasses = [this.type, ...getBaseClasses(LCAgentExecutor)]
|
||||
this.inputs = [
|
||||
{
|
||||
label: 'Allowed Tools',
|
||||
@@ -52,54 +60,324 @@ class OpenAIFunctionAgent_Agents implements INode {
|
||||
additionalParams: true
|
||||
}
|
||||
]
|
||||
this.sessionId = fields?.sessionId
|
||||
}
|
||||
|
||||
async init(nodeData: INodeData): Promise<any> {
|
||||
const model = nodeData.inputs?.model as BaseLanguageModel
|
||||
const memory = nodeData.inputs?.memory as BaseChatMemory
|
||||
const systemMessage = nodeData.inputs?.systemMessage as string
|
||||
const memory = nodeData.inputs?.memory as FlowiseMemory
|
||||
|
||||
let tools = nodeData.inputs?.tools
|
||||
tools = flatten(tools)
|
||||
|
||||
const executor = await initializeAgentExecutorWithOptions(tools, model, {
|
||||
agentType: 'openai-functions',
|
||||
verbose: process.env.DEBUG === 'true' ? true : false,
|
||||
agentArgs: {
|
||||
prefix: systemMessage ?? `You are a helpful AI assistant.`
|
||||
}
|
||||
})
|
||||
const executor = prepareAgent(nodeData, this.sessionId)
|
||||
if (memory) executor.memory = memory
|
||||
|
||||
return executor
|
||||
}
|
||||
|
||||
async run(nodeData: INodeData, input: string, options: ICommonObject): Promise<string> {
|
||||
const executor = nodeData.instance as AgentExecutor
|
||||
const memory = nodeData.inputs?.memory as BaseChatMemory
|
||||
const memory = nodeData.inputs?.memory as FlowiseMemory
|
||||
|
||||
if (options && options.chatHistory) {
|
||||
const chatHistoryClassName = memory.chatHistory.constructor.name
|
||||
// Only replace when its In-Memory
|
||||
if (chatHistoryClassName && chatHistoryClassName === 'ChatMessageHistory') {
|
||||
memory.chatHistory = mapChatHistory(options)
|
||||
executor.memory = memory
|
||||
}
|
||||
}
|
||||
|
||||
;(executor.memory as any).returnMessages = true // Return true for BaseChatModel
|
||||
const executor = prepareAgent(nodeData, this.sessionId)
|
||||
|
||||
const loggerHandler = new ConsoleCallbackHandler(options.logger)
|
||||
const callbacks = await additionalCallbacks(nodeData, options)
|
||||
|
||||
let res: ChainValues = {}
|
||||
|
||||
if (options.socketIO && options.socketIOClientId) {
|
||||
const handler = new CustomChainHandler(options.socketIO, options.socketIOClientId)
|
||||
const result = await executor.run(input, [loggerHandler, handler, ...callbacks])
|
||||
return result
|
||||
res = await executor.invoke({ input }, { callbacks: [loggerHandler, handler, ...callbacks] })
|
||||
} else {
|
||||
const result = await executor.run(input, [loggerHandler, ...callbacks])
|
||||
return result
|
||||
res = await executor.invoke({ input }, { callbacks: [loggerHandler, ...callbacks] })
|
||||
}
|
||||
|
||||
await memory.addChatMessages(
|
||||
[
|
||||
{
|
||||
text: input,
|
||||
type: 'userMessage'
|
||||
},
|
||||
{
|
||||
text: res?.output,
|
||||
type: 'apiMessage'
|
||||
}
|
||||
],
|
||||
this.sessionId
|
||||
)
|
||||
|
||||
return res?.output
|
||||
}
|
||||
}
|
||||
|
||||
const formatAgentSteps = (steps: AgentStep[]): BaseMessage[] =>
|
||||
steps.flatMap(({ action, observation }) => {
|
||||
if ('messageLog' in action && action.messageLog !== undefined) {
|
||||
const log = action.messageLog as BaseMessage[]
|
||||
return log.concat(new FunctionMessage(observation, action.tool))
|
||||
} else {
|
||||
return [new AIMessage(action.log)]
|
||||
}
|
||||
})
|
||||
|
||||
const prepareAgent = (nodeData: INodeData, sessionId?: string) => {
|
||||
const model = nodeData.inputs?.model as ChatOpenAI
|
||||
const memory = nodeData.inputs?.memory as FlowiseMemory
|
||||
const systemMessage = nodeData.inputs?.systemMessage as string
|
||||
let tools = nodeData.inputs?.tools
|
||||
tools = flatten(tools)
|
||||
const memoryKey = memory.memoryKey ? memory.memoryKey : 'chat_history'
|
||||
const inputKey = memory.inputKey ? memory.inputKey : 'input'
|
||||
|
||||
const prompt = ChatPromptTemplate.fromMessages([
|
||||
['ai', systemMessage ? systemMessage : `You are a helpful AI assistant.`],
|
||||
new MessagesPlaceholder(memoryKey),
|
||||
['human', `{${inputKey}}`],
|
||||
new MessagesPlaceholder('agent_scratchpad')
|
||||
])
|
||||
|
||||
const modelWithFunctions = model.bind({
|
||||
functions: [...tools.map((tool: any) => formatToOpenAIFunction(tool))]
|
||||
})
|
||||
|
||||
const runnableAgent = RunnableSequence.from([
|
||||
{
|
||||
[inputKey]: (i: { input: string; steps: AgentStep[] }) => i.input,
|
||||
agent_scratchpad: (i: { input: string; steps: AgentStep[] }) => formatAgentSteps(i.steps),
|
||||
[memoryKey]: async (_: { input: string; steps: AgentStep[] }) => {
|
||||
const messages = (await memory.getChatMessages(sessionId, true)) as BaseMessage[]
|
||||
return messages ?? []
|
||||
}
|
||||
},
|
||||
prompt,
|
||||
modelWithFunctions,
|
||||
new OpenAIFunctionsAgentOutputParser()
|
||||
])
|
||||
|
||||
const executor = AgentExecutor.fromAgentAndTools({
|
||||
agent: runnableAgent,
|
||||
tools,
|
||||
sessionId
|
||||
})
|
||||
|
||||
return executor
|
||||
}
|
||||
|
||||
type AgentExecutorOutput = ChainValues
|
||||
|
||||
class AgentExecutor extends LCAgentExecutor {
|
||||
sessionId?: string
|
||||
|
||||
static fromAgentAndTools(fields: AgentExecutorInput & { sessionId?: string }): AgentExecutor {
|
||||
const newInstance = new AgentExecutor(fields)
|
||||
if (fields.sessionId) newInstance.sessionId = fields.sessionId
|
||||
return newInstance
|
||||
}
|
||||
|
||||
shouldContinueIteration(iterations: number): boolean {
|
||||
return this.maxIterations === undefined || iterations < this.maxIterations
|
||||
}
|
||||
|
||||
async _call(inputs: ChainValues, runManager?: CallbackManagerForChainRun): Promise<AgentExecutorOutput> {
|
||||
const toolsByName = Object.fromEntries(this.tools.map((t) => [t.name.toLowerCase(), t]))
|
||||
|
||||
const steps: AgentStep[] = []
|
||||
let iterations = 0
|
||||
|
||||
const getOutput = async (finishStep: AgentFinish): Promise<AgentExecutorOutput> => {
|
||||
const { returnValues } = finishStep
|
||||
const additional = await this.agent.prepareForOutput(returnValues, steps)
|
||||
|
||||
if (this.returnIntermediateSteps) {
|
||||
return { ...returnValues, intermediateSteps: steps, ...additional }
|
||||
}
|
||||
await runManager?.handleAgentEnd(finishStep)
|
||||
return { ...returnValues, ...additional }
|
||||
}
|
||||
|
||||
while (this.shouldContinueIteration(iterations)) {
|
||||
let output
|
||||
try {
|
||||
output = await this.agent.plan(steps, inputs, runManager?.getChild())
|
||||
} catch (e) {
|
||||
if (e instanceof OutputParserException) {
|
||||
let observation
|
||||
let text = e.message
|
||||
if (this.handleParsingErrors === true) {
|
||||
if (e.sendToLLM) {
|
||||
observation = e.observation
|
||||
text = e.llmOutput ?? ''
|
||||
} else {
|
||||
observation = 'Invalid or incomplete response'
|
||||
}
|
||||
} else if (typeof this.handleParsingErrors === 'string') {
|
||||
observation = this.handleParsingErrors
|
||||
} else if (typeof this.handleParsingErrors === 'function') {
|
||||
observation = this.handleParsingErrors(e)
|
||||
} else {
|
||||
throw e
|
||||
}
|
||||
output = {
|
||||
tool: '_Exception',
|
||||
toolInput: observation,
|
||||
log: text
|
||||
} as AgentAction
|
||||
} else {
|
||||
throw e
|
||||
}
|
||||
}
|
||||
// Check if the agent has finished
|
||||
if ('returnValues' in output) {
|
||||
return getOutput(output)
|
||||
}
|
||||
|
||||
let actions: AgentAction[]
|
||||
if (Array.isArray(output)) {
|
||||
actions = output as AgentAction[]
|
||||
} else {
|
||||
actions = [output as AgentAction]
|
||||
}
|
||||
|
||||
const newSteps = await Promise.all(
|
||||
actions.map(async (action) => {
|
||||
await runManager?.handleAgentAction(action)
|
||||
const tool = action.tool === '_Exception' ? new ExceptionTool() : toolsByName[action.tool?.toLowerCase()]
|
||||
let observation
|
||||
try {
|
||||
// here we need to override Tool call method to include sessionId as parameter
|
||||
observation = tool
|
||||
? // @ts-ignore
|
||||
await tool.call(action.toolInput, runManager?.getChild(), undefined, this.sessionId)
|
||||
: `${action.tool} is not a valid tool, try another one.`
|
||||
} catch (e) {
|
||||
if (e instanceof ToolInputParsingException) {
|
||||
if (this.handleParsingErrors === true) {
|
||||
observation = 'Invalid or incomplete tool input. Please try again.'
|
||||
} else if (typeof this.handleParsingErrors === 'string') {
|
||||
observation = this.handleParsingErrors
|
||||
} else if (typeof this.handleParsingErrors === 'function') {
|
||||
observation = this.handleParsingErrors(e)
|
||||
} else {
|
||||
throw e
|
||||
}
|
||||
observation = await new ExceptionTool().call(observation, runManager?.getChild())
|
||||
return { action, observation: observation ?? '' }
|
||||
}
|
||||
}
|
||||
return { action, observation: observation ?? '' }
|
||||
})
|
||||
)
|
||||
|
||||
steps.push(...newSteps)
|
||||
|
||||
const lastStep = steps[steps.length - 1]
|
||||
const lastTool = toolsByName[lastStep.action.tool?.toLowerCase()]
|
||||
|
||||
if (lastTool?.returnDirect) {
|
||||
return getOutput({
|
||||
returnValues: { [this.agent.returnValues[0]]: lastStep.observation },
|
||||
log: ''
|
||||
})
|
||||
}
|
||||
|
||||
iterations += 1
|
||||
}
|
||||
|
||||
const finish = await this.agent.returnStoppedResponse(this.earlyStoppingMethod, steps, inputs)
|
||||
|
||||
return getOutput(finish)
|
||||
}
|
||||
|
||||
async _takeNextStep(
|
||||
nameToolMap: Record<string, Tool>,
|
||||
inputs: ChainValues,
|
||||
intermediateSteps: AgentStep[],
|
||||
runManager?: CallbackManagerForChainRun
|
||||
): Promise<AgentFinish | AgentStep[]> {
|
||||
let output
|
||||
try {
|
||||
output = await this.agent.plan(intermediateSteps, inputs, runManager?.getChild())
|
||||
} catch (e) {
|
||||
if (e instanceof OutputParserException) {
|
||||
let observation
|
||||
let text = e.message
|
||||
if (this.handleParsingErrors === true) {
|
||||
if (e.sendToLLM) {
|
||||
observation = e.observation
|
||||
text = e.llmOutput ?? ''
|
||||
} else {
|
||||
observation = 'Invalid or incomplete response'
|
||||
}
|
||||
} else if (typeof this.handleParsingErrors === 'string') {
|
||||
observation = this.handleParsingErrors
|
||||
} else if (typeof this.handleParsingErrors === 'function') {
|
||||
observation = this.handleParsingErrors(e)
|
||||
} else {
|
||||
throw e
|
||||
}
|
||||
output = {
|
||||
tool: '_Exception',
|
||||
toolInput: observation,
|
||||
log: text
|
||||
} as AgentAction
|
||||
} else {
|
||||
throw e
|
||||
}
|
||||
}
|
||||
|
||||
if ('returnValues' in output) {
|
||||
return output
|
||||
}
|
||||
|
||||
let actions: AgentAction[]
|
||||
if (Array.isArray(output)) {
|
||||
actions = output as AgentAction[]
|
||||
} else {
|
||||
actions = [output as AgentAction]
|
||||
}
|
||||
|
||||
const result: AgentStep[] = []
|
||||
for (const agentAction of actions) {
|
||||
let observation = ''
|
||||
if (runManager) {
|
||||
await runManager?.handleAgentAction(agentAction)
|
||||
}
|
||||
if (agentAction.tool in nameToolMap) {
|
||||
const tool = nameToolMap[agentAction.tool]
|
||||
try {
|
||||
// here we need to override Tool call method to include sessionId as parameter
|
||||
// @ts-ignore
|
||||
observation = await tool.call(agentAction.toolInput, runManager?.getChild(), undefined, this.sessionId)
|
||||
} catch (e) {
|
||||
if (e instanceof ToolInputParsingException) {
|
||||
if (this.handleParsingErrors === true) {
|
||||
observation = 'Invalid or incomplete tool input. Please try again.'
|
||||
} else if (typeof this.handleParsingErrors === 'string') {
|
||||
observation = this.handleParsingErrors
|
||||
} else if (typeof this.handleParsingErrors === 'function') {
|
||||
observation = this.handleParsingErrors(e)
|
||||
} else {
|
||||
throw e
|
||||
}
|
||||
observation = await new ExceptionTool().call(observation, runManager?.getChild())
|
||||
}
|
||||
}
|
||||
} else {
|
||||
observation = `${agentAction.tool} is not a valid tool, try another available tool: ${Object.keys(nameToolMap).join(', ')}`
|
||||
}
|
||||
result.push({
|
||||
action: agentAction,
|
||||
observation
|
||||
})
|
||||
}
|
||||
return result
|
||||
}
|
||||
}
|
||||
|
||||
class ExceptionTool extends Tool {
|
||||
name = '_Exception'
|
||||
|
||||
description = 'Exception tool'
|
||||
|
||||
async _call(query: string) {
|
||||
return query
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { INode, INodeData, INodeParams } from '../../../src/Interface'
|
||||
import { getBaseClasses } from '../../../src/utils'
|
||||
import { BufferMemory } from 'langchain/memory'
|
||||
import { FlowiseMemory, IMessage, INode, INodeData, INodeParams, MemoryMethods, MessageType } from '../../../src/Interface'
|
||||
import { convertBaseMessagetoIMessage, getBaseClasses } from '../../../src/utils'
|
||||
import { BufferMemory, BufferMemoryInput } from 'langchain/memory'
|
||||
import { BaseMessage } from 'langchain/schema'
|
||||
|
||||
class BufferMemory_Memory implements INode {
|
||||
label: string
|
||||
@@ -41,7 +42,7 @@ class BufferMemory_Memory implements INode {
|
||||
async init(nodeData: INodeData): Promise<any> {
|
||||
const memoryKey = nodeData.inputs?.memoryKey as string
|
||||
const inputKey = nodeData.inputs?.inputKey as string
|
||||
return new BufferMemory({
|
||||
return new BufferMemoryExtended({
|
||||
returnMessages: true,
|
||||
memoryKey,
|
||||
inputKey
|
||||
@@ -49,4 +50,41 @@ class BufferMemory_Memory implements INode {
|
||||
}
|
||||
}
|
||||
|
||||
class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods {
|
||||
constructor(fields: BufferMemoryInput) {
|
||||
super(fields)
|
||||
}
|
||||
|
||||
async getChatMessages(_?: string, returnBaseMessages = false): Promise<IMessage[] | BaseMessage[]> {
|
||||
const memoryResult = await this.loadMemoryVariables({})
|
||||
const baseMessages = memoryResult[this.memoryKey ?? 'chat_history']
|
||||
return returnBaseMessages ? baseMessages : convertBaseMessagetoIMessage(baseMessages)
|
||||
}
|
||||
|
||||
async addChatMessages(msgArray: { text: string; type: MessageType }[]): Promise<void> {
|
||||
const input = msgArray.find((msg) => msg.type === 'userMessage')
|
||||
const output = msgArray.find((msg) => msg.type === 'apiMessage')
|
||||
|
||||
const inputValues = { [this.inputKey ?? 'input']: input?.text }
|
||||
const outputValues = { output: output?.text }
|
||||
|
||||
await this.saveContext(inputValues, outputValues)
|
||||
}
|
||||
|
||||
async clearChatMessages(): Promise<void> {
|
||||
await this.clear()
|
||||
}
|
||||
|
||||
async resumeMessages(messages: IMessage[]): Promise<void> {
|
||||
// Clear existing chatHistory to avoid duplication
|
||||
if (messages.length) await this.clear()
|
||||
|
||||
// Insert into chatHistory
|
||||
for (const msg of messages) {
|
||||
if (msg.type === 'userMessage') await this.chatHistory.addUserMessage(msg.message)
|
||||
else if (msg.type === 'apiMessage') await this.chatHistory.addAIChatMessage(msg.message)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = { nodeClass: BufferMemory_Memory }
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { INode, INodeData, INodeParams } from '../../../src/Interface'
|
||||
import { getBaseClasses } from '../../../src/utils'
|
||||
import { FlowiseWindowMemory, IMessage, INode, INodeData, INodeParams, MemoryMethods, MessageType } from '../../../src/Interface'
|
||||
import { convertBaseMessagetoIMessage, getBaseClasses } from '../../../src/utils'
|
||||
import { BufferWindowMemory, BufferWindowMemoryInput } from 'langchain/memory'
|
||||
import { BaseMessage } from 'langchain/schema'
|
||||
|
||||
class BufferWindowMemory_Memory implements INode {
|
||||
label: string
|
||||
@@ -57,7 +58,44 @@ class BufferWindowMemory_Memory implements INode {
|
||||
k: parseInt(k, 10)
|
||||
}
|
||||
|
||||
return new BufferWindowMemory(obj)
|
||||
return new BufferWindowMemoryExtended(obj)
|
||||
}
|
||||
}
|
||||
|
||||
class BufferWindowMemoryExtended extends FlowiseWindowMemory implements MemoryMethods {
|
||||
constructor(fields: BufferWindowMemoryInput) {
|
||||
super(fields)
|
||||
}
|
||||
|
||||
async getChatMessages(_?: string, returnBaseMessages = false): Promise<IMessage[] | BaseMessage[]> {
|
||||
const memoryResult = await this.loadMemoryVariables({})
|
||||
const baseMessages = memoryResult[this.memoryKey ?? 'chat_history']
|
||||
return returnBaseMessages ? baseMessages : convertBaseMessagetoIMessage(baseMessages)
|
||||
}
|
||||
|
||||
async addChatMessages(msgArray: { text: string; type: MessageType }[]): Promise<void> {
|
||||
const input = msgArray.find((msg) => msg.type === 'userMessage')
|
||||
const output = msgArray.find((msg) => msg.type === 'apiMessage')
|
||||
|
||||
const inputValues = { [this.inputKey ?? 'input']: input?.text }
|
||||
const outputValues = { output: output?.text }
|
||||
|
||||
await this.saveContext(inputValues, outputValues)
|
||||
}
|
||||
|
||||
async clearChatMessages(): Promise<void> {
|
||||
await this.clear()
|
||||
}
|
||||
|
||||
async resumeMessages(messages: IMessage[]): Promise<void> {
|
||||
// Clear existing chatHistory to avoid duplication
|
||||
if (messages.length) await this.clear()
|
||||
|
||||
// Insert into chatHistory
|
||||
for (const msg of messages) {
|
||||
if (msg.type === 'userMessage') await this.chatHistory.addUserMessage(msg.message)
|
||||
else if (msg.type === 'apiMessage') await this.chatHistory.addAIChatMessage(msg.message)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
+45
-3
@@ -1,7 +1,8 @@
|
||||
import { INode, INodeData, INodeParams } from '../../../src/Interface'
|
||||
import { getBaseClasses } from '../../../src/utils'
|
||||
import { FlowiseSummaryMemory, IMessage, INode, INodeData, INodeParams, MemoryMethods, MessageType } from '../../../src/Interface'
|
||||
import { convertBaseMessagetoIMessage, getBaseClasses } from '../../../src/utils'
|
||||
import { ConversationSummaryMemory, ConversationSummaryMemoryInput } from 'langchain/memory'
|
||||
import { BaseLanguageModel } from 'langchain/base_language'
|
||||
import { BaseMessage } from 'langchain/schema'
|
||||
|
||||
class ConversationSummaryMemory_Memory implements INode {
|
||||
label: string
|
||||
@@ -56,7 +57,48 @@ class ConversationSummaryMemory_Memory implements INode {
|
||||
inputKey
|
||||
}
|
||||
|
||||
return new ConversationSummaryMemory(obj)
|
||||
return new ConversationSummaryMemoryExtended(obj)
|
||||
}
|
||||
}
|
||||
|
||||
class ConversationSummaryMemoryExtended extends FlowiseSummaryMemory implements MemoryMethods {
|
||||
constructor(fields: ConversationSummaryMemoryInput) {
|
||||
super(fields)
|
||||
}
|
||||
|
||||
async getChatMessages(_?: string, returnBaseMessages = false): Promise<IMessage[] | BaseMessage[]> {
|
||||
const memoryResult = await this.loadMemoryVariables({})
|
||||
const baseMessages = memoryResult[this.memoryKey ?? 'chat_history']
|
||||
return returnBaseMessages ? baseMessages : convertBaseMessagetoIMessage(baseMessages)
|
||||
}
|
||||
|
||||
async addChatMessages(msgArray: { text: string; type: MessageType }[]): Promise<void> {
|
||||
const input = msgArray.find((msg) => msg.type === 'userMessage')
|
||||
const output = msgArray.find((msg) => msg.type === 'apiMessage')
|
||||
|
||||
const inputValues = { [this.inputKey ?? 'input']: input?.text }
|
||||
const outputValues = { output: output?.text }
|
||||
|
||||
await this.saveContext(inputValues, outputValues)
|
||||
}
|
||||
|
||||
async clearChatMessages(): Promise<void> {
|
||||
await this.clear()
|
||||
}
|
||||
|
||||
async resumeMessages(messages: IMessage[]): Promise<void> {
|
||||
// Clear existing chatHistory to avoid duplication
|
||||
if (messages.length) await this.clear()
|
||||
|
||||
// Insert into chatHistory
|
||||
for (const msg of messages) {
|
||||
if (msg.type === 'userMessage') await this.chatHistory.addUserMessage(msg.message)
|
||||
else if (msg.type === 'apiMessage') await this.chatHistory.addAIChatMessage(msg.message)
|
||||
}
|
||||
|
||||
// Replace buffer
|
||||
const chatMessages = await this.chatHistory.getMessages()
|
||||
this.buffer = await this.predictNewSummary(chatMessages.slice(-2), this.buffer)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,15 +1,25 @@
|
||||
import {
|
||||
ICommonObject,
|
||||
INode,
|
||||
INodeData,
|
||||
INodeParams,
|
||||
DynamoDBClient,
|
||||
DynamoDBClientConfig,
|
||||
GetItemCommand,
|
||||
GetItemCommandInput,
|
||||
UpdateItemCommand,
|
||||
UpdateItemCommandInput,
|
||||
DeleteItemCommand,
|
||||
DeleteItemCommandInput,
|
||||
AttributeValue
|
||||
} from '@aws-sdk/client-dynamodb'
|
||||
import { DynamoDBChatMessageHistory } from 'langchain/stores/message/dynamodb'
|
||||
import { BufferMemory, BufferMemoryInput } from 'langchain/memory'
|
||||
import { mapStoredMessageToChatMessage, AIMessage, HumanMessage, StoredMessage, BaseMessage } from 'langchain/schema'
|
||||
import {
|
||||
convertBaseMessagetoIMessage,
|
||||
getBaseClasses,
|
||||
getCredentialData,
|
||||
getCredentialParam,
|
||||
serializeChatHistory
|
||||
} from '../../../src'
|
||||
import { DynamoDBChatMessageHistory } from 'langchain/stores/message/dynamodb'
|
||||
import { BufferMemory, BufferMemoryInput } from 'langchain/memory'
|
||||
} from '../../../src/utils'
|
||||
import { FlowiseMemory, ICommonObject, IMessage, INode, INodeData, INodeParams, MemoryMethods, MessageType } from '../../../src/Interface'
|
||||
|
||||
class DynamoDb_Memory implements INode {
|
||||
label: string
|
||||
@@ -102,49 +112,203 @@ class DynamoDb_Memory implements INode {
|
||||
const initalizeDynamoDB = async (nodeData: INodeData, options: ICommonObject): Promise<BufferMemory> => {
|
||||
const tableName = nodeData.inputs?.tableName as string
|
||||
const partitionKey = nodeData.inputs?.partitionKey as string
|
||||
const sessionId = nodeData.inputs?.sessionId as string
|
||||
const region = nodeData.inputs?.region as string
|
||||
const memoryKey = nodeData.inputs?.memoryKey as string
|
||||
const chatId = options.chatId
|
||||
|
||||
let isSessionIdUsingChatMessageId = false
|
||||
if (!sessionId && chatId) isSessionIdUsingChatMessageId = true
|
||||
let sessionId = ''
|
||||
|
||||
if (!nodeData.inputs?.sessionId && chatId) {
|
||||
isSessionIdUsingChatMessageId = true
|
||||
sessionId = chatId
|
||||
} else {
|
||||
sessionId = nodeData.inputs?.sessionId
|
||||
}
|
||||
|
||||
const credentialData = await getCredentialData(nodeData.credential ?? '', options)
|
||||
const accessKeyId = getCredentialParam('accessKey', credentialData, nodeData)
|
||||
const secretAccessKey = getCredentialParam('secretAccessKey', credentialData, nodeData)
|
||||
|
||||
const config: DynamoDBClientConfig = {
|
||||
region,
|
||||
credentials: {
|
||||
accessKeyId,
|
||||
secretAccessKey
|
||||
}
|
||||
}
|
||||
|
||||
const client = new DynamoDBClient(config ?? {})
|
||||
|
||||
const dynamoDb = new DynamoDBChatMessageHistory({
|
||||
tableName,
|
||||
partitionKey,
|
||||
sessionId: sessionId ? sessionId : chatId,
|
||||
config: {
|
||||
region,
|
||||
credentials: {
|
||||
accessKeyId,
|
||||
secretAccessKey
|
||||
}
|
||||
}
|
||||
sessionId,
|
||||
config
|
||||
})
|
||||
|
||||
const memory = new BufferMemoryExtended({
|
||||
memoryKey: memoryKey ?? 'chat_history',
|
||||
chatHistory: dynamoDb,
|
||||
isSessionIdUsingChatMessageId
|
||||
isSessionIdUsingChatMessageId,
|
||||
sessionId,
|
||||
dynamodbClient: client
|
||||
})
|
||||
return memory
|
||||
}
|
||||
|
||||
interface BufferMemoryExtendedInput {
|
||||
isSessionIdUsingChatMessageId: boolean
|
||||
dynamodbClient: DynamoDBClient
|
||||
sessionId: string
|
||||
}
|
||||
|
||||
class BufferMemoryExtended extends BufferMemory {
|
||||
isSessionIdUsingChatMessageId? = false
|
||||
interface DynamoDBSerializedChatMessage {
|
||||
M: {
|
||||
type: {
|
||||
S: string
|
||||
}
|
||||
text: {
|
||||
S: string
|
||||
}
|
||||
role?: {
|
||||
S: string
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
constructor(fields: BufferMemoryInput & Partial<BufferMemoryExtendedInput>) {
|
||||
class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods {
|
||||
isSessionIdUsingChatMessageId = false
|
||||
sessionId = ''
|
||||
dynamodbClient: DynamoDBClient
|
||||
|
||||
constructor(fields: BufferMemoryInput & BufferMemoryExtendedInput) {
|
||||
super(fields)
|
||||
this.isSessionIdUsingChatMessageId = fields.isSessionIdUsingChatMessageId
|
||||
this.sessionId = fields.sessionId
|
||||
this.dynamodbClient = fields.dynamodbClient
|
||||
}
|
||||
|
||||
overrideDynamoKey(overrideSessionId = '') {
|
||||
const existingDynamoKey = (this as any).dynamoKey
|
||||
const partitionKey = (this as any).partitionKey
|
||||
|
||||
let newDynamoKey: Record<string, AttributeValue> = {}
|
||||
|
||||
if (Object.keys(existingDynamoKey).includes(partitionKey)) {
|
||||
newDynamoKey[partitionKey] = { S: overrideSessionId }
|
||||
}
|
||||
|
||||
return Object.keys(newDynamoKey).length ? newDynamoKey : existingDynamoKey
|
||||
}
|
||||
|
||||
async addNewMessage(
|
||||
messages: StoredMessage[],
|
||||
client: DynamoDBClient,
|
||||
tableName = '',
|
||||
dynamoKey: Record<string, AttributeValue> = {},
|
||||
messageAttributeName = 'messages'
|
||||
) {
|
||||
const params: UpdateItemCommandInput = {
|
||||
TableName: tableName,
|
||||
Key: dynamoKey,
|
||||
ExpressionAttributeNames: {
|
||||
'#m': messageAttributeName
|
||||
},
|
||||
ExpressionAttributeValues: {
|
||||
':empty_list': {
|
||||
L: []
|
||||
},
|
||||
':m': {
|
||||
L: messages.map((message) => {
|
||||
const dynamoSerializedMessage: DynamoDBSerializedChatMessage = {
|
||||
M: {
|
||||
type: {
|
||||
S: message.type
|
||||
},
|
||||
text: {
|
||||
S: message.data.content
|
||||
}
|
||||
}
|
||||
}
|
||||
if (message.data.role) {
|
||||
dynamoSerializedMessage.M.role = { S: message.data.role }
|
||||
}
|
||||
return dynamoSerializedMessage
|
||||
})
|
||||
}
|
||||
},
|
||||
UpdateExpression: 'SET #m = list_append(if_not_exists(#m, :empty_list), :m)'
|
||||
}
|
||||
|
||||
await client.send(new UpdateItemCommand(params))
|
||||
}
|
||||
|
||||
async getChatMessages(overrideSessionId = '', returnBaseMessages = false): Promise<IMessage[] | BaseMessage[]> {
|
||||
if (!this.dynamodbClient) return []
|
||||
|
||||
const dynamoKey = overrideSessionId ? this.overrideDynamoKey(overrideSessionId) : (this as any).dynamoKey
|
||||
const tableName = (this as any).tableName
|
||||
const messageAttributeName = (this as any).messageAttributeName
|
||||
|
||||
const params: GetItemCommandInput = {
|
||||
TableName: tableName,
|
||||
Key: dynamoKey
|
||||
}
|
||||
|
||||
const response = await this.dynamodbClient.send(new GetItemCommand(params))
|
||||
const items = response.Item ? response.Item[messageAttributeName]?.L ?? [] : []
|
||||
const messages = items
|
||||
.map((item) => ({
|
||||
type: item.M?.type.S,
|
||||
data: {
|
||||
role: item.M?.role?.S,
|
||||
content: item.M?.text.S
|
||||
}
|
||||
}))
|
||||
.filter((x): x is StoredMessage => x.type !== undefined && x.data.content !== undefined)
|
||||
const baseMessages = messages.map(mapStoredMessageToChatMessage)
|
||||
return returnBaseMessages ? baseMessages : convertBaseMessagetoIMessage(baseMessages)
|
||||
}
|
||||
|
||||
async addChatMessages(msgArray: { text: string; type: MessageType }[], overrideSessionId = ''): Promise<void> {
|
||||
if (!this.dynamodbClient) return
|
||||
|
||||
const dynamoKey = overrideSessionId ? this.overrideDynamoKey(overrideSessionId) : (this as any).dynamoKey
|
||||
const tableName = (this as any).tableName
|
||||
const messageAttributeName = (this as any).messageAttributeName
|
||||
|
||||
const input = msgArray.find((msg) => msg.type === 'userMessage')
|
||||
const output = msgArray.find((msg) => msg.type === 'apiMessage')
|
||||
|
||||
if (input) {
|
||||
const newInputMessage = new HumanMessage(input.text)
|
||||
const messageToAdd = [newInputMessage].map((msg) => msg.toDict())
|
||||
await this.addNewMessage(messageToAdd, this.dynamodbClient, tableName, dynamoKey, messageAttributeName)
|
||||
}
|
||||
|
||||
if (output) {
|
||||
const newOutputMessage = new AIMessage(output.text)
|
||||
const messageToAdd = [newOutputMessage].map((msg) => msg.toDict())
|
||||
await this.addNewMessage(messageToAdd, this.dynamodbClient, tableName, dynamoKey, messageAttributeName)
|
||||
}
|
||||
}
|
||||
|
||||
async clearChatMessages(overrideSessionId = ''): Promise<void> {
|
||||
if (!this.dynamodbClient) return
|
||||
|
||||
const dynamoKey = overrideSessionId ? this.overrideDynamoKey(overrideSessionId) : (this as any).dynamoKey
|
||||
const tableName = (this as any).tableName
|
||||
|
||||
const params: DeleteItemCommandInput = {
|
||||
TableName: tableName,
|
||||
Key: dynamoKey
|
||||
}
|
||||
await this.dynamodbClient.send(new DeleteItemCommand(params))
|
||||
await this.clear()
|
||||
}
|
||||
|
||||
async resumeMessages(): Promise<void> {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,17 +1,15 @@
|
||||
import { MongoClient, Collection, Document } from 'mongodb'
|
||||
import { MongoDBChatMessageHistory } from 'langchain/stores/message/mongodb'
|
||||
import { BufferMemory, BufferMemoryInput } from 'langchain/memory'
|
||||
import { mapStoredMessageToChatMessage, AIMessage, HumanMessage, BaseMessage } from 'langchain/schema'
|
||||
import {
|
||||
convertBaseMessagetoIMessage,
|
||||
getBaseClasses,
|
||||
getCredentialData,
|
||||
getCredentialParam,
|
||||
ICommonObject,
|
||||
INode,
|
||||
INodeData,
|
||||
INodeParams,
|
||||
serializeChatHistory
|
||||
} from '../../../src'
|
||||
import { MongoDBChatMessageHistory } from 'langchain/stores/message/mongodb'
|
||||
import { BufferMemory, BufferMemoryInput } from 'langchain/memory'
|
||||
import { BaseMessage, mapStoredMessageToChatMessage } from 'langchain/schema'
|
||||
import { MongoClient } from 'mongodb'
|
||||
} from '../../../src/utils'
|
||||
import { FlowiseMemory, ICommonObject, IMessage, INode, INodeData, INodeParams, MemoryMethods, MessageType } from '../../../src/Interface'
|
||||
|
||||
class MongoDB_Memory implements INode {
|
||||
label: string
|
||||
@@ -99,23 +97,30 @@ class MongoDB_Memory implements INode {
|
||||
const initializeMongoDB = async (nodeData: INodeData, options: ICommonObject): Promise<BufferMemory> => {
|
||||
const databaseName = nodeData.inputs?.databaseName as string
|
||||
const collectionName = nodeData.inputs?.collectionName as string
|
||||
const sessionId = nodeData.inputs?.sessionId as string
|
||||
const memoryKey = nodeData.inputs?.memoryKey as string
|
||||
const chatId = options?.chatId as string
|
||||
|
||||
let isSessionIdUsingChatMessageId = false
|
||||
if (!sessionId && chatId) isSessionIdUsingChatMessageId = true
|
||||
let sessionId = ''
|
||||
|
||||
if (!nodeData.inputs?.sessionId && chatId) {
|
||||
isSessionIdUsingChatMessageId = true
|
||||
sessionId = chatId
|
||||
} else {
|
||||
sessionId = nodeData.inputs?.sessionId
|
||||
}
|
||||
|
||||
const credentialData = await getCredentialData(nodeData.credential ?? '', options)
|
||||
let mongoDBConnectUrl = getCredentialParam('mongoDBConnectUrl', credentialData, nodeData)
|
||||
const mongoDBConnectUrl = getCredentialParam('mongoDBConnectUrl', credentialData, nodeData)
|
||||
|
||||
const client = new MongoClient(mongoDBConnectUrl)
|
||||
await client.connect()
|
||||
|
||||
const collection = client.db(databaseName).collection(collectionName)
|
||||
|
||||
const mongoDBChatMessageHistory = new MongoDBChatMessageHistory({
|
||||
collection,
|
||||
sessionId: sessionId ? sessionId : chatId
|
||||
sessionId
|
||||
})
|
||||
|
||||
mongoDBChatMessageHistory.getMessages = async (): Promise<BaseMessage[]> => {
|
||||
@@ -144,20 +149,81 @@ const initializeMongoDB = async (nodeData: INodeData, options: ICommonObject): P
|
||||
return new BufferMemoryExtended({
|
||||
memoryKey: memoryKey ?? 'chat_history',
|
||||
chatHistory: mongoDBChatMessageHistory,
|
||||
isSessionIdUsingChatMessageId
|
||||
isSessionIdUsingChatMessageId,
|
||||
sessionId,
|
||||
collection
|
||||
})
|
||||
}
|
||||
|
||||
interface BufferMemoryExtendedInput {
|
||||
isSessionIdUsingChatMessageId: boolean
|
||||
collection: Collection<Document>
|
||||
sessionId: string
|
||||
}
|
||||
|
||||
class BufferMemoryExtended extends BufferMemory {
|
||||
class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods {
|
||||
sessionId = ''
|
||||
collection: Collection<Document>
|
||||
isSessionIdUsingChatMessageId? = false
|
||||
|
||||
constructor(fields: BufferMemoryInput & Partial<BufferMemoryExtendedInput>) {
|
||||
constructor(fields: BufferMemoryInput & BufferMemoryExtendedInput) {
|
||||
super(fields)
|
||||
this.isSessionIdUsingChatMessageId = fields.isSessionIdUsingChatMessageId
|
||||
this.sessionId = fields.sessionId
|
||||
this.collection = fields.collection
|
||||
}
|
||||
|
||||
async getChatMessages(overrideSessionId = '', returnBaseMessages = false): Promise<IMessage[] | BaseMessage[]> {
|
||||
if (!this.collection) return []
|
||||
|
||||
const id = overrideSessionId ?? this.sessionId
|
||||
const document = await this.collection.findOne({ sessionId: id })
|
||||
const messages = document?.messages || []
|
||||
const baseMessages = messages.map(mapStoredMessageToChatMessage)
|
||||
return returnBaseMessages ? baseMessages : convertBaseMessagetoIMessage(baseMessages)
|
||||
}
|
||||
|
||||
async addChatMessages(msgArray: { text: string; type: MessageType }[], overrideSessionId = ''): Promise<void> {
|
||||
if (!this.collection) return
|
||||
|
||||
const id = overrideSessionId ?? this.sessionId
|
||||
const input = msgArray.find((msg) => msg.type === 'userMessage')
|
||||
const output = msgArray.find((msg) => msg.type === 'apiMessage')
|
||||
|
||||
if (input) {
|
||||
const newInputMessage = new HumanMessage(input.text)
|
||||
const messageToAdd = [newInputMessage].map((msg) => msg.toDict())
|
||||
await this.collection.updateOne(
|
||||
{ sessionId: id },
|
||||
{
|
||||
$push: { messages: { $each: messageToAdd } }
|
||||
},
|
||||
{ upsert: true }
|
||||
)
|
||||
}
|
||||
|
||||
if (output) {
|
||||
const newOutputMessage = new AIMessage(output.text)
|
||||
const messageToAdd = [newOutputMessage].map((msg) => msg.toDict())
|
||||
await this.collection.updateOne(
|
||||
{ sessionId: id },
|
||||
{
|
||||
$push: { messages: { $each: messageToAdd } }
|
||||
},
|
||||
{ upsert: true }
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
async clearChatMessages(overrideSessionId = ''): Promise<void> {
|
||||
if (!this.collection) return
|
||||
|
||||
const id = overrideSessionId ?? this.sessionId
|
||||
await this.collection.deleteOne({ sessionId: id })
|
||||
await this.clear()
|
||||
}
|
||||
|
||||
async resumeMessages(): Promise<void> {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
import { INode, INodeData, INodeParams } from '../../../src/Interface'
|
||||
import { getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils'
|
||||
import { IMessage, INode, INodeData, INodeParams, MemoryMethods, MessageType } from '../../../src/Interface'
|
||||
import { convertBaseMessagetoIMessage, getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils'
|
||||
import { ICommonObject } from '../../../src'
|
||||
import { MotorheadMemory, MotorheadMemoryInput } from 'langchain/memory'
|
||||
import { MotorheadMemory, MotorheadMemoryInput, InputValues, MemoryVariables, OutputValues, getBufferString } from 'langchain/memory'
|
||||
import fetch from 'node-fetch'
|
||||
import { getBufferString } from 'langchain/memory'
|
||||
import { BaseMessage } from 'langchain/schema'
|
||||
|
||||
class MotorMemory_Memory implements INode {
|
||||
label: string
|
||||
@@ -88,19 +88,26 @@ class MotorMemory_Memory implements INode {
|
||||
const initalizeMotorhead = async (nodeData: INodeData, options: ICommonObject): Promise<MotorheadMemory> => {
|
||||
const memoryKey = nodeData.inputs?.memoryKey as string
|
||||
const baseURL = nodeData.inputs?.baseURL as string
|
||||
const sessionId = nodeData.inputs?.sessionId as string
|
||||
const chatId = options?.chatId as string
|
||||
|
||||
let isSessionIdUsingChatMessageId = false
|
||||
if (!sessionId && chatId) isSessionIdUsingChatMessageId = true
|
||||
let sessionId = ''
|
||||
|
||||
if (!nodeData.inputs?.sessionId && chatId) {
|
||||
isSessionIdUsingChatMessageId = true
|
||||
sessionId = chatId
|
||||
} else {
|
||||
sessionId = nodeData.inputs?.sessionId
|
||||
}
|
||||
|
||||
const credentialData = await getCredentialData(nodeData.credential ?? '', options)
|
||||
const apiKey = getCredentialParam('apiKey', credentialData, nodeData)
|
||||
const clientId = getCredentialParam('clientId', credentialData, nodeData)
|
||||
|
||||
let obj: MotorheadMemoryInput & Partial<MotorheadMemoryExtendedInput> = {
|
||||
let obj: MotorheadMemoryInput & MotorheadMemoryExtendedInput = {
|
||||
returnMessages: true,
|
||||
sessionId: sessionId ? sessionId : chatId,
|
||||
isSessionIdUsingChatMessageId,
|
||||
sessionId,
|
||||
memoryKey
|
||||
}
|
||||
|
||||
@@ -117,8 +124,6 @@ const initalizeMotorhead = async (nodeData: INodeData, options: ICommonObject):
|
||||
}
|
||||
}
|
||||
|
||||
if (isSessionIdUsingChatMessageId) obj.isSessionIdUsingChatMessageId = true
|
||||
|
||||
const motorheadMemory = new MotorheadMemoryExtended(obj)
|
||||
|
||||
// Get messages from sessionId
|
||||
@@ -131,15 +136,32 @@ interface MotorheadMemoryExtendedInput {
|
||||
isSessionIdUsingChatMessageId: boolean
|
||||
}
|
||||
|
||||
class MotorheadMemoryExtended extends MotorheadMemory {
|
||||
class MotorheadMemoryExtended extends MotorheadMemory implements MemoryMethods {
|
||||
isSessionIdUsingChatMessageId? = false
|
||||
|
||||
constructor(fields: MotorheadMemoryInput & Partial<MotorheadMemoryExtendedInput>) {
|
||||
constructor(fields: MotorheadMemoryInput & MotorheadMemoryExtendedInput) {
|
||||
super(fields)
|
||||
this.isSessionIdUsingChatMessageId = fields.isSessionIdUsingChatMessageId
|
||||
}
|
||||
|
||||
async clear(): Promise<void> {
|
||||
async loadMemoryVariables(values: InputValues, overrideSessionId = ''): Promise<MemoryVariables> {
|
||||
if (overrideSessionId) {
|
||||
this.sessionId = overrideSessionId
|
||||
}
|
||||
return super.loadMemoryVariables({ values })
|
||||
}
|
||||
|
||||
async saveContext(inputValues: InputValues, outputValues: OutputValues, overrideSessionId = ''): Promise<void> {
|
||||
if (overrideSessionId) {
|
||||
this.sessionId = overrideSessionId
|
||||
}
|
||||
return super.saveContext(inputValues, outputValues)
|
||||
}
|
||||
|
||||
async clear(overrideSessionId = ''): Promise<void> {
|
||||
if (overrideSessionId) {
|
||||
this.sessionId = overrideSessionId
|
||||
}
|
||||
try {
|
||||
await this.caller.call(fetch, `${this.url}/sessions/${this.sessionId}/memory`, {
|
||||
//@ts-ignore
|
||||
@@ -155,6 +177,28 @@ class MotorheadMemoryExtended extends MotorheadMemory {
|
||||
await this.chatHistory.clear()
|
||||
await super.clear()
|
||||
}
|
||||
|
||||
async getChatMessages(overrideSessionId = '', returnBaseMessages = false): Promise<IMessage[] | BaseMessage[]> {
|
||||
const id = overrideSessionId ?? this.sessionId
|
||||
const memoryVariables = await this.loadMemoryVariables({}, id)
|
||||
const baseMessages = memoryVariables[this.memoryKey]
|
||||
return returnBaseMessages ? baseMessages : convertBaseMessagetoIMessage(baseMessages)
|
||||
}
|
||||
|
||||
async addChatMessages(msgArray: { text: string; type: MessageType }[], overrideSessionId = ''): Promise<void> {
|
||||
const id = overrideSessionId ?? this.sessionId
|
||||
const input = msgArray.find((msg) => msg.type === 'userMessage')
|
||||
const output = msgArray.find((msg) => msg.type === 'apiMessage')
|
||||
const inputValues = { [this.inputKey ?? 'input']: input?.text }
|
||||
const outputValues = { output: output?.text }
|
||||
|
||||
await this.saveContext(inputValues, outputValues, id)
|
||||
}
|
||||
|
||||
async clearChatMessages(overrideSessionId = ''): Promise<void> {
|
||||
const id = overrideSessionId ?? this.sessionId
|
||||
await this.clear(id)
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = { nodeClass: MotorMemory_Memory }
|
||||
|
||||
@@ -1,8 +1,14 @@
|
||||
import { INode, INodeData, INodeParams, ICommonObject } from '../../../src/Interface'
|
||||
import { getBaseClasses, getCredentialData, getCredentialParam, serializeChatHistory } from '../../../src/utils'
|
||||
import { INode, INodeData, INodeParams, ICommonObject, IMessage, MessageType, FlowiseMemory, MemoryMethods } from '../../../src/Interface'
|
||||
import {
|
||||
convertBaseMessagetoIMessage,
|
||||
getBaseClasses,
|
||||
getCredentialData,
|
||||
getCredentialParam,
|
||||
serializeChatHistory
|
||||
} from '../../../src/utils'
|
||||
import { BufferMemory, BufferMemoryInput } from 'langchain/memory'
|
||||
import { RedisChatMessageHistory, RedisChatMessageHistoryInput } from 'langchain/stores/message/ioredis'
|
||||
import { mapStoredMessageToChatMessage, BaseMessage } from 'langchain/schema'
|
||||
import { mapStoredMessageToChatMessage, BaseMessage, AIMessage, HumanMessage } from 'langchain/schema'
|
||||
import { Redis } from 'ioredis'
|
||||
|
||||
class RedisBackedChatMemory_Memory implements INode {
|
||||
@@ -94,14 +100,20 @@ class RedisBackedChatMemory_Memory implements INode {
|
||||
}
|
||||
|
||||
const initalizeRedis = async (nodeData: INodeData, options: ICommonObject): Promise<BufferMemory> => {
|
||||
const sessionId = nodeData.inputs?.sessionId as string
|
||||
const sessionTTL = nodeData.inputs?.sessionTTL as number
|
||||
const memoryKey = nodeData.inputs?.memoryKey as string
|
||||
const windowSize = nodeData.inputs?.windowSize as number
|
||||
const chatId = options?.chatId as string
|
||||
|
||||
let isSessionIdUsingChatMessageId = false
|
||||
if (!sessionId && chatId) isSessionIdUsingChatMessageId = true
|
||||
let sessionId = ''
|
||||
|
||||
if (!nodeData.inputs?.sessionId && chatId) {
|
||||
isSessionIdUsingChatMessageId = true
|
||||
sessionId = chatId
|
||||
} else {
|
||||
sessionId = nodeData.inputs?.sessionId
|
||||
}
|
||||
|
||||
const credentialData = await getCredentialData(nodeData.credential ?? '', options)
|
||||
const redisUrl = getCredentialParam('redisUrl', credentialData, nodeData)
|
||||
@@ -128,7 +140,7 @@ const initalizeRedis = async (nodeData: INodeData, options: ICommonObject): Prom
|
||||
}
|
||||
|
||||
let obj: RedisChatMessageHistoryInput = {
|
||||
sessionId: sessionId ? sessionId : chatId,
|
||||
sessionId,
|
||||
client
|
||||
}
|
||||
|
||||
@@ -162,21 +174,71 @@ const initalizeRedis = async (nodeData: INodeData, options: ICommonObject): Prom
|
||||
const memory = new BufferMemoryExtended({
|
||||
memoryKey: memoryKey ?? 'chat_history',
|
||||
chatHistory: redisChatMessageHistory,
|
||||
isSessionIdUsingChatMessageId
|
||||
isSessionIdUsingChatMessageId,
|
||||
sessionId,
|
||||
redisClient: client
|
||||
})
|
||||
return memory
|
||||
}
|
||||
|
||||
interface BufferMemoryExtendedInput {
|
||||
isSessionIdUsingChatMessageId: boolean
|
||||
redisClient: Redis
|
||||
sessionId: string
|
||||
}
|
||||
|
||||
class BufferMemoryExtended extends BufferMemory {
|
||||
class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods {
|
||||
isSessionIdUsingChatMessageId? = false
|
||||
sessionId = ''
|
||||
redisClient: Redis
|
||||
|
||||
constructor(fields: BufferMemoryInput & Partial<BufferMemoryExtendedInput>) {
|
||||
constructor(fields: BufferMemoryInput & BufferMemoryExtendedInput) {
|
||||
super(fields)
|
||||
this.isSessionIdUsingChatMessageId = fields.isSessionIdUsingChatMessageId
|
||||
this.sessionId = fields.sessionId
|
||||
this.redisClient = fields.redisClient
|
||||
}
|
||||
|
||||
async getChatMessages(overrideSessionId = '', returnBaseMessage = false): Promise<IMessage[] | BaseMessage[]> {
|
||||
if (!this.redisClient) return []
|
||||
|
||||
const id = overrideSessionId ?? this.sessionId
|
||||
const rawStoredMessages = await this.redisClient.lrange(id, 0, -1)
|
||||
const orderedMessages = rawStoredMessages.reverse().map((message) => JSON.parse(message))
|
||||
const baseMessages = orderedMessages.map(mapStoredMessageToChatMessage)
|
||||
return returnBaseMessage ? baseMessages : convertBaseMessagetoIMessage(baseMessages)
|
||||
}
|
||||
|
||||
async addChatMessages(msgArray: { text: string; type: MessageType }[], overrideSessionId = ''): Promise<void> {
|
||||
if (!this.redisClient) return
|
||||
|
||||
const id = overrideSessionId ?? this.sessionId
|
||||
const input = msgArray.find((msg) => msg.type === 'userMessage')
|
||||
const output = msgArray.find((msg) => msg.type === 'apiMessage')
|
||||
|
||||
if (input) {
|
||||
const newInputMessage = new HumanMessage(input.text)
|
||||
const messageToAdd = [newInputMessage].map((msg) => msg.toDict())
|
||||
await this.redisClient.lpush(id, JSON.stringify(messageToAdd[0]))
|
||||
}
|
||||
|
||||
if (output) {
|
||||
const newOutputMessage = new AIMessage(output.text)
|
||||
const messageToAdd = [newOutputMessage].map((msg) => msg.toDict())
|
||||
await this.redisClient.lpush(id, JSON.stringify(messageToAdd[0]))
|
||||
}
|
||||
}
|
||||
|
||||
async clearChatMessages(overrideSessionId = ''): Promise<void> {
|
||||
if (!this.redisClient) return
|
||||
|
||||
const id = overrideSessionId ?? this.sessionId
|
||||
await this.redisClient.del(id)
|
||||
await this.clear()
|
||||
}
|
||||
|
||||
async resumeMessages(): Promise<void> {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
+80
-13
@@ -1,8 +1,16 @@
|
||||
import { INode, INodeData, INodeParams } from '../../../src/Interface'
|
||||
import { getBaseClasses, getCredentialData, getCredentialParam, serializeChatHistory } from '../../../src/utils'
|
||||
import { ICommonObject } from '../../../src'
|
||||
import { Redis } from '@upstash/redis'
|
||||
import { BufferMemory, BufferMemoryInput } from 'langchain/memory'
|
||||
import { UpstashRedisChatMessageHistory } from 'langchain/stores/message/upstash_redis'
|
||||
import { mapStoredMessageToChatMessage, AIMessage, HumanMessage, StoredMessage, BaseMessage } from 'langchain/schema'
|
||||
import { FlowiseMemory, IMessage, INode, INodeData, INodeParams, MemoryMethods, MessageType } from '../../../src/Interface'
|
||||
import {
|
||||
convertBaseMessagetoIMessage,
|
||||
getBaseClasses,
|
||||
getCredentialData,
|
||||
getCredentialParam,
|
||||
serializeChatHistory
|
||||
} from '../../../src/utils'
|
||||
import { ICommonObject } from '../../../src/Interface'
|
||||
|
||||
class UpstashRedisBackedChatMemory_Memory implements INode {
|
||||
label: string
|
||||
@@ -84,29 +92,39 @@ class UpstashRedisBackedChatMemory_Memory implements INode {
|
||||
|
||||
const initalizeUpstashRedis = async (nodeData: INodeData, options: ICommonObject): Promise<BufferMemory> => {
|
||||
const baseURL = nodeData.inputs?.baseURL as string
|
||||
const sessionId = nodeData.inputs?.sessionId as string
|
||||
const sessionTTL = nodeData.inputs?.sessionTTL as string
|
||||
const chatId = options?.chatId as string
|
||||
|
||||
let isSessionIdUsingChatMessageId = false
|
||||
if (!sessionId && chatId) isSessionIdUsingChatMessageId = true
|
||||
let sessionId = ''
|
||||
|
||||
if (!nodeData.inputs?.sessionId && chatId) {
|
||||
isSessionIdUsingChatMessageId = true
|
||||
sessionId = chatId
|
||||
} else {
|
||||
sessionId = nodeData.inputs?.sessionId
|
||||
}
|
||||
|
||||
const credentialData = await getCredentialData(nodeData.credential ?? '', options)
|
||||
const upstashRestToken = getCredentialParam('upstashRestToken', credentialData, nodeData)
|
||||
|
||||
const client = new Redis({
|
||||
url: baseURL,
|
||||
token: upstashRestToken
|
||||
})
|
||||
|
||||
const redisChatMessageHistory = new UpstashRedisChatMessageHistory({
|
||||
sessionId: sessionId ? sessionId : chatId,
|
||||
sessionId,
|
||||
sessionTTL: sessionTTL ? parseInt(sessionTTL, 10) : undefined,
|
||||
config: {
|
||||
url: baseURL,
|
||||
token: upstashRestToken
|
||||
}
|
||||
client
|
||||
})
|
||||
|
||||
const memory = new BufferMemoryExtended({
|
||||
memoryKey: 'chat_history',
|
||||
chatHistory: redisChatMessageHistory,
|
||||
isSessionIdUsingChatMessageId
|
||||
isSessionIdUsingChatMessageId,
|
||||
sessionId,
|
||||
redisClient: client
|
||||
})
|
||||
|
||||
return memory
|
||||
@@ -114,14 +132,63 @@ const initalizeUpstashRedis = async (nodeData: INodeData, options: ICommonObject
|
||||
|
||||
interface BufferMemoryExtendedInput {
|
||||
isSessionIdUsingChatMessageId: boolean
|
||||
redisClient: Redis
|
||||
sessionId: string
|
||||
}
|
||||
|
||||
class BufferMemoryExtended extends BufferMemory {
|
||||
class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods {
|
||||
isSessionIdUsingChatMessageId? = false
|
||||
sessionId = ''
|
||||
redisClient: Redis
|
||||
|
||||
constructor(fields: BufferMemoryInput & Partial<BufferMemoryExtendedInput>) {
|
||||
constructor(fields: BufferMemoryInput & BufferMemoryExtendedInput) {
|
||||
super(fields)
|
||||
this.isSessionIdUsingChatMessageId = fields.isSessionIdUsingChatMessageId
|
||||
this.sessionId = fields.sessionId
|
||||
this.redisClient = fields.redisClient
|
||||
}
|
||||
|
||||
async getChatMessages(overrideSessionId = '', returnBaseMessages = false): Promise<IMessage[] | BaseMessage[]> {
|
||||
if (!this.redisClient) return []
|
||||
|
||||
const id = overrideSessionId ?? this.sessionId
|
||||
const rawStoredMessages: StoredMessage[] = await this.redisClient.lrange<StoredMessage>(id, 0, -1)
|
||||
const orderedMessages = rawStoredMessages.reverse()
|
||||
const previousMessages = orderedMessages.filter((x): x is StoredMessage => x.type !== undefined && x.data.content !== undefined)
|
||||
const baseMessages = previousMessages.map(mapStoredMessageToChatMessage)
|
||||
return returnBaseMessages ? baseMessages : convertBaseMessagetoIMessage(baseMessages)
|
||||
}
|
||||
|
||||
async addChatMessages(msgArray: { text: string; type: MessageType }[], overrideSessionId = ''): Promise<void> {
|
||||
if (!this.redisClient) return
|
||||
|
||||
const id = overrideSessionId ?? this.sessionId
|
||||
const input = msgArray.find((msg) => msg.type === 'userMessage')
|
||||
const output = msgArray.find((msg) => msg.type === 'apiMessage')
|
||||
|
||||
if (input) {
|
||||
const newInputMessage = new HumanMessage(input.text)
|
||||
const messageToAdd = [newInputMessage].map((msg) => msg.toDict())
|
||||
await this.redisClient.lpush(id, JSON.stringify(messageToAdd[0]))
|
||||
}
|
||||
|
||||
if (output) {
|
||||
const newOutputMessage = new AIMessage(output.text)
|
||||
const messageToAdd = [newOutputMessage].map((msg) => msg.toDict())
|
||||
await this.redisClient.lpush(id, JSON.stringify(messageToAdd[0]))
|
||||
}
|
||||
}
|
||||
|
||||
async clearChatMessages(overrideSessionId = ''): Promise<void> {
|
||||
if (!this.redisClient) return
|
||||
|
||||
const id = overrideSessionId ?? this.sessionId
|
||||
await this.redisClient.del(id)
|
||||
await this.clear()
|
||||
}
|
||||
|
||||
async resumeMessages(): Promise<void> {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
import { IMessage, INode, INodeData, INodeParams, MemoryMethods, MessageType } from '../../../src/Interface'
|
||||
import { convertBaseMessagetoIMessage, getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils'
|
||||
import { ZepMemory, ZepMemoryInput } from 'langchain/memory/zep'
|
||||
import { getBufferString, InputValues, MemoryVariables, OutputValues } from 'langchain/memory'
|
||||
import { INode, INodeData, INodeParams } from '../../../src/Interface'
|
||||
import { getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils'
|
||||
import { ICommonObject } from '../../../src'
|
||||
import { InputValues, MemoryVariables, OutputValues, getBufferString } from 'langchain/memory'
|
||||
import { BaseMessage } from 'langchain/schema'
|
||||
|
||||
class ZepMemory_Memory implements INode {
|
||||
label: string
|
||||
@@ -147,7 +148,7 @@ const initalizeZep = async (nodeData: INodeData, options: ICommonObject): Promis
|
||||
|
||||
const obj: ZepMemoryInput & ZepMemoryExtendedInput = {
|
||||
baseURL,
|
||||
sessionId: sessionId ? sessionId : chatId,
|
||||
sessionId,
|
||||
aiPrefix,
|
||||
humanPrefix,
|
||||
returnMessages: true,
|
||||
@@ -166,7 +167,7 @@ interface ZepMemoryExtendedInput {
|
||||
k?: number
|
||||
}
|
||||
|
||||
class ZepMemoryExtended extends ZepMemory {
|
||||
class ZepMemoryExtended extends ZepMemory implements MemoryMethods {
|
||||
isSessionIdUsingChatMessageId? = false
|
||||
lastN?: number
|
||||
|
||||
@@ -196,6 +197,28 @@ class ZepMemoryExtended extends ZepMemory {
|
||||
}
|
||||
return super.clear()
|
||||
}
|
||||
|
||||
async getChatMessages(overrideSessionId = '', returnBaseMessages = false): Promise<IMessage[] | BaseMessage[]> {
|
||||
const id = overrideSessionId ?? this.sessionId
|
||||
const memoryVariables = await this.loadMemoryVariables({}, id)
|
||||
const baseMessages = memoryVariables[this.memoryKey]
|
||||
return returnBaseMessages ? baseMessages : convertBaseMessagetoIMessage(baseMessages)
|
||||
}
|
||||
|
||||
async addChatMessages(msgArray: { text: string; type: MessageType }[], overrideSessionId = ''): Promise<void> {
|
||||
const id = overrideSessionId ?? this.sessionId
|
||||
const input = msgArray.find((msg) => msg.type === 'userMessage')
|
||||
const output = msgArray.find((msg) => msg.type === 'apiMessage')
|
||||
const inputValues = { [this.inputKey ?? 'input']: input?.text }
|
||||
const outputValues = { output: output?.text }
|
||||
|
||||
await this.saveContext(inputValues, outputValues, id)
|
||||
}
|
||||
|
||||
async clearChatMessages(overrideSessionId = ''): Promise<void> {
|
||||
const id = overrideSessionId ?? this.sessionId
|
||||
await this.clear(id)
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = { nodeClass: ZepMemory_Memory }
|
||||
|
||||
@@ -60,7 +60,7 @@ class CustomTool_Tools implements INode {
|
||||
}
|
||||
}
|
||||
|
||||
async init(nodeData: INodeData, _: string, options: ICommonObject): Promise<any> {
|
||||
async init(nodeData: INodeData, input: string, options: ICommonObject): Promise<any> {
|
||||
const selectedToolId = nodeData.inputs?.selectedTool as string
|
||||
const customToolFunc = nodeData.inputs?.customToolFunc as string
|
||||
|
||||
@@ -80,7 +80,36 @@ class CustomTool_Tools implements INode {
|
||||
code: tool.func
|
||||
}
|
||||
if (customToolFunc) obj.code = customToolFunc
|
||||
return new DynamicStructuredTool(obj)
|
||||
|
||||
const variables = await appDataSource.getRepository(databaseEntities['Variable']).find()
|
||||
|
||||
// override variables defined in overrideConfig
|
||||
// nodeData.inputs.variables is an Object, check each property and override the variable
|
||||
if (nodeData?.inputs?.vars) {
|
||||
for (const propertyName of Object.getOwnPropertyNames(nodeData.inputs.vars)) {
|
||||
const foundVar = variables.find((v) => v.name === propertyName)
|
||||
if (foundVar) {
|
||||
// even if the variable was defined as runtime, we override it with static value
|
||||
foundVar.type = 'static'
|
||||
foundVar.value = nodeData.inputs.vars[propertyName]
|
||||
} else {
|
||||
// add it the variables, if not found locally in the db
|
||||
variables.push({ name: propertyName, type: 'static', value: nodeData.inputs.vars[propertyName] })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const flow = {
|
||||
chatId: options.chatId, // id is uppercase (I)
|
||||
chatflowId: options.chatflowid, // id is lowercase (i)
|
||||
input
|
||||
}
|
||||
|
||||
let dynamicStructuredTool = new DynamicStructuredTool(obj)
|
||||
dynamicStructuredTool.setVariables(variables)
|
||||
dynamicStructuredTool.setFlowObject(flow)
|
||||
|
||||
return dynamicStructuredTool
|
||||
} catch (e) {
|
||||
throw new Error(e)
|
||||
}
|
||||
|
||||
@@ -1,8 +1,18 @@
|
||||
import { z } from 'zod'
|
||||
import { CallbackManagerForToolRun } from 'langchain/callbacks'
|
||||
import { StructuredTool, ToolParams } from 'langchain/tools'
|
||||
import { NodeVM } from 'vm2'
|
||||
import { availableDependencies } from '../../../src/utils'
|
||||
import { RunnableConfig } from '@langchain/core/runnables'
|
||||
import { StructuredTool, ToolParams } from '@langchain/core/tools'
|
||||
import { CallbackManagerForToolRun, Callbacks, CallbackManager, parseCallbackConfigArg } from '@langchain/core/callbacks/manager'
|
||||
|
||||
class ToolInputParsingException extends Error {
|
||||
output?: string
|
||||
|
||||
constructor(message: string, output?: string) {
|
||||
super(message)
|
||||
this.output = output
|
||||
}
|
||||
}
|
||||
|
||||
export interface BaseDynamicToolInput extends ToolParams {
|
||||
name: string
|
||||
@@ -32,6 +42,8 @@ export class DynamicStructuredTool<
|
||||
func: DynamicStructuredToolInput['func']
|
||||
|
||||
schema: T
|
||||
private variables: any[]
|
||||
private flowObj: any
|
||||
|
||||
constructor(fields: DynamicStructuredToolInput<T>) {
|
||||
super(fields)
|
||||
@@ -43,7 +55,47 @@ export class DynamicStructuredTool<
|
||||
this.schema = fields.schema
|
||||
}
|
||||
|
||||
protected async _call(arg: z.output<T>): Promise<string> {
|
||||
async call(arg: z.output<T>, configArg?: RunnableConfig | Callbacks, tags?: string[], overrideSessionId?: string): Promise<string> {
|
||||
const config = parseCallbackConfigArg(configArg)
|
||||
if (config.runName === undefined) {
|
||||
config.runName = this.name
|
||||
}
|
||||
let parsed
|
||||
try {
|
||||
parsed = await this.schema.parseAsync(arg)
|
||||
} catch (e) {
|
||||
throw new ToolInputParsingException(`Received tool input did not match expected schema`, JSON.stringify(arg))
|
||||
}
|
||||
const callbackManager_ = await CallbackManager.configure(
|
||||
config.callbacks,
|
||||
this.callbacks,
|
||||
config.tags || tags,
|
||||
this.tags,
|
||||
config.metadata,
|
||||
this.metadata,
|
||||
{ verbose: this.verbose }
|
||||
)
|
||||
const runManager = await callbackManager_?.handleToolStart(
|
||||
this.toJSON(),
|
||||
typeof parsed === 'string' ? parsed : JSON.stringify(parsed),
|
||||
undefined,
|
||||
undefined,
|
||||
undefined,
|
||||
undefined,
|
||||
config.runName
|
||||
)
|
||||
let result
|
||||
try {
|
||||
result = await this._call(parsed, runManager, overrideSessionId)
|
||||
} catch (e) {
|
||||
await runManager?.handleToolError(e)
|
||||
throw e
|
||||
}
|
||||
await runManager?.handleToolEnd(result)
|
||||
return result
|
||||
}
|
||||
|
||||
protected async _call(arg: z.output<T>, _?: CallbackManagerForToolRun, overrideSessionId?: string): Promise<string> {
|
||||
let sandbox: any = {}
|
||||
if (typeof arg === 'object' && Object.keys(arg).length) {
|
||||
for (const item in arg) {
|
||||
@@ -51,6 +103,32 @@ export class DynamicStructuredTool<
|
||||
}
|
||||
}
|
||||
|
||||
// inject variables
|
||||
let vars = {}
|
||||
if (this.variables) {
|
||||
for (const item of this.variables) {
|
||||
let value = item.value
|
||||
|
||||
// read from .env file
|
||||
if (item.type === 'runtime') {
|
||||
value = process.env[item.name]
|
||||
}
|
||||
|
||||
Object.defineProperty(vars, item.name, {
|
||||
enumerable: true,
|
||||
configurable: true,
|
||||
writable: true,
|
||||
value: value
|
||||
})
|
||||
}
|
||||
}
|
||||
sandbox['$vars'] = vars
|
||||
|
||||
// inject flow properties
|
||||
if (this.flowObj) {
|
||||
sandbox['$flow'] = { ...this.flowObj, sessionId: overrideSessionId }
|
||||
}
|
||||
|
||||
const defaultAllowBuiltInDep = [
|
||||
'assert',
|
||||
'buffer',
|
||||
@@ -87,4 +165,12 @@ export class DynamicStructuredTool<
|
||||
|
||||
return response
|
||||
}
|
||||
|
||||
setVariables(variables: any[]) {
|
||||
this.variables = variables
|
||||
}
|
||||
|
||||
setFlowObject(flow: any) {
|
||||
this.flowObj = flow
|
||||
}
|
||||
}
|
||||
|
||||
@@ -73,6 +73,7 @@ export interface INodeParams {
|
||||
additionalParams?: boolean
|
||||
loadMethod?: string
|
||||
hidden?: boolean
|
||||
variables?: ICommonObject[]
|
||||
}
|
||||
|
||||
export interface INodeExecutionData {
|
||||
@@ -195,3 +196,37 @@ export class VectorStoreRetriever {
|
||||
this.vectorStore = fields.vectorStore
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Implement abstract classes and interface for memory
|
||||
*/
|
||||
import { BaseMessage } from 'langchain/schema'
|
||||
import { BufferMemory, BufferWindowMemory, ConversationSummaryMemory } from 'langchain/memory'
|
||||
|
||||
export interface MemoryMethods {
|
||||
getChatMessages(overrideSessionId?: string, returnBaseMessages?: boolean): Promise<IMessage[] | BaseMessage[]>
|
||||
addChatMessages(msgArray: { text: string; type: MessageType }[], overrideSessionId?: string): Promise<void>
|
||||
clearChatMessages(overrideSessionId?: string): Promise<void>
|
||||
resumeMessages?(messages: IMessage[]): Promise<void>
|
||||
}
|
||||
|
||||
export abstract class FlowiseMemory extends BufferMemory implements MemoryMethods {
|
||||
abstract getChatMessages(overrideSessionId?: string, returnBaseMessages?: boolean): Promise<IMessage[] | BaseMessage[]>
|
||||
abstract addChatMessages(msgArray: { text: string; type: MessageType }[], overrideSessionId?: string): Promise<void>
|
||||
abstract clearChatMessages(overrideSessionId?: string): Promise<void>
|
||||
abstract resumeMessages(messages: IMessage[]): Promise<void>
|
||||
}
|
||||
|
||||
export abstract class FlowiseWindowMemory extends BufferWindowMemory implements MemoryMethods {
|
||||
abstract getChatMessages(overrideSessionId?: string, returnBaseMessages?: boolean): Promise<IMessage[] | BaseMessage[]>
|
||||
abstract addChatMessages(msgArray: { text: string; type: MessageType }[], overrideSessionId?: string): Promise<void>
|
||||
abstract clearChatMessages(overrideSessionId?: string): Promise<void>
|
||||
abstract resumeMessages(messages: IMessage[]): Promise<void>
|
||||
}
|
||||
|
||||
export abstract class FlowiseSummaryMemory extends ConversationSummaryMemory implements MemoryMethods {
|
||||
abstract getChatMessages(overrideSessionId?: string, returnBaseMessages?: boolean): Promise<IMessage[] | BaseMessage[]>
|
||||
abstract addChatMessages(msgArray: { text: string; type: MessageType }[], overrideSessionId?: string): Promise<void>
|
||||
abstract clearChatMessages(overrideSessionId?: string): Promise<void>
|
||||
abstract resumeMessages(messages: IMessage[]): Promise<void>
|
||||
}
|
||||
|
||||
@@ -8,7 +8,7 @@ import { DataSource } from 'typeorm'
|
||||
import { ICommonObject, IDatabaseEntity, IMessage, INodeData } from './Interface'
|
||||
import { AES, enc } from 'crypto-js'
|
||||
import { ChatMessageHistory } from 'langchain/memory'
|
||||
import { AIMessage, HumanMessage } from 'langchain/schema'
|
||||
import { AIMessage, HumanMessage, BaseMessage } from 'langchain/schema'
|
||||
|
||||
export const numberOrExpressionRegex = '^(\\d+\\.?\\d*|{{.*}})$' //return true if string consists only numbers OR expression {{}}
|
||||
export const notEmptyRegex = '(.|\\s)*\\S(.|\\s)*' //return true if string is not empty or blank
|
||||
@@ -645,3 +645,31 @@ export const convertSchemaToZod = (schema: string | object): ICommonObject => {
|
||||
throw new Error(e)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert BaseMessage to IMessage
|
||||
* @param {BaseMessage[]} messages
|
||||
* @returns {IMessage[]}
|
||||
*/
|
||||
export const convertBaseMessagetoIMessage = (messages: BaseMessage[]): IMessage[] => {
|
||||
const formatmessages: IMessage[] = []
|
||||
for (const m of messages) {
|
||||
if (m._getType() === 'human') {
|
||||
formatmessages.push({
|
||||
message: m.content as string,
|
||||
type: 'userMessage'
|
||||
})
|
||||
} else if (m._getType() === 'ai') {
|
||||
formatmessages.push({
|
||||
message: m.content as string,
|
||||
type: 'apiMessage'
|
||||
})
|
||||
} else if (m._getType() === 'system') {
|
||||
formatmessages.push({
|
||||
message: m.content as string,
|
||||
type: 'apiMessage'
|
||||
})
|
||||
}
|
||||
}
|
||||
return formatmessages
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user