mirror of
https://github.com/farcasclaudiu/Flowise.git
synced 2026-06-28 19:00:59 +03:00
pass sessionId to custom tool
This commit is contained in:
@@ -1,10 +1,18 @@
|
||||
import { ICommonObject, INode, INodeData, INodeParams } from '../../../src/Interface'
|
||||
import { initializeAgentExecutorWithOptions, AgentExecutor } from 'langchain/agents'
|
||||
import { getBaseClasses, mapChatHistory } from '../../../src/utils'
|
||||
import { BaseLanguageModel } from 'langchain/base_language'
|
||||
import { AgentExecutor, AgentExecutorInput } from 'langchain/agents'
|
||||
import { ChainValues, AgentStep, AgentFinish, AgentAction, BaseMessage, FunctionMessage, AIMessage } from 'langchain/schema'
|
||||
import { OutputParserException } from 'langchain/schema/output_parser'
|
||||
import { CallbackManagerForChainRun } from 'langchain/callbacks'
|
||||
import { formatToOpenAIFunction } from 'langchain/tools'
|
||||
import { ToolInputParsingException, Tool } from '@langchain/core/tools'
|
||||
import { getBaseClasses } from '../../../src/utils'
|
||||
import { flatten } from 'lodash'
|
||||
import { BaseChatMemory } from 'langchain/memory'
|
||||
import { RunnableSequence } from 'langchain/schema/runnable'
|
||||
import { ConsoleCallbackHandler, CustomChainHandler, additionalCallbacks } from '../../../src/handler'
|
||||
import { ChatPromptTemplate, MessagesPlaceholder } from 'langchain/prompts'
|
||||
import { ChatOpenAI } from 'langchain/chat_models/openai'
|
||||
import { OpenAIFunctionsAgentOutputParser } from 'langchain/agents/openai/output_parser'
|
||||
|
||||
class OpenAIFunctionAgent_Agents implements INode {
|
||||
label: string
|
||||
@@ -16,8 +24,9 @@ class OpenAIFunctionAgent_Agents implements INode {
|
||||
category: string
|
||||
baseClasses: string[]
|
||||
inputs: INodeParams[]
|
||||
sessionId?: string
|
||||
|
||||
constructor() {
|
||||
constructor(fields: { sessionId?: string }) {
|
||||
this.label = 'OpenAI Function Agent'
|
||||
this.name = 'openAIFunctionAgent'
|
||||
this.version = 3.0
|
||||
@@ -52,54 +61,323 @@ class OpenAIFunctionAgent_Agents implements INode {
|
||||
additionalParams: true
|
||||
}
|
||||
]
|
||||
this.sessionId = fields?.sessionId
|
||||
}
|
||||
|
||||
async init(nodeData: INodeData): Promise<any> {
|
||||
const model = nodeData.inputs?.model as BaseLanguageModel
|
||||
const memory = nodeData.inputs?.memory as BaseChatMemory
|
||||
const systemMessage = nodeData.inputs?.systemMessage as string
|
||||
|
||||
let tools = nodeData.inputs?.tools
|
||||
tools = flatten(tools)
|
||||
|
||||
const executor = await initializeAgentExecutorWithOptions(tools, model, {
|
||||
agentType: 'openai-functions',
|
||||
verbose: process.env.DEBUG === 'true' ? true : false,
|
||||
agentArgs: {
|
||||
prefix: systemMessage ?? `You are a helpful AI assistant.`
|
||||
}
|
||||
})
|
||||
const executor = prepareAgent(nodeData, this.sessionId)
|
||||
if (memory) executor.memory = memory
|
||||
|
||||
return executor
|
||||
}
|
||||
|
||||
async run(nodeData: INodeData, input: string, options: ICommonObject): Promise<string> {
|
||||
const executor = nodeData.instance as AgentExecutor
|
||||
const memory = nodeData.inputs?.memory as BaseChatMemory
|
||||
const memory = nodeData.inputs?.memory
|
||||
|
||||
if (options && options.chatHistory) {
|
||||
const chatHistoryClassName = memory.chatHistory.constructor.name
|
||||
// Only replace when its In-Memory
|
||||
if (chatHistoryClassName && chatHistoryClassName === 'ChatMessageHistory') {
|
||||
memory.chatHistory = mapChatHistory(options)
|
||||
executor.memory = memory
|
||||
}
|
||||
}
|
||||
|
||||
;(executor.memory as any).returnMessages = true // Return true for BaseChatModel
|
||||
const executor = prepareAgent(nodeData, this.sessionId)
|
||||
|
||||
const loggerHandler = new ConsoleCallbackHandler(options.logger)
|
||||
const callbacks = await additionalCallbacks(nodeData, options)
|
||||
|
||||
let res: ChainValues = {}
|
||||
|
||||
if (options.socketIO && options.socketIOClientId) {
|
||||
const handler = new CustomChainHandler(options.socketIO, options.socketIOClientId)
|
||||
const result = await executor.run(input, [loggerHandler, handler, ...callbacks])
|
||||
return result
|
||||
res = await executor.invoke({ input }, { callbacks: [loggerHandler, handler, ...callbacks] })
|
||||
} else {
|
||||
const result = await executor.run(input, [loggerHandler, ...callbacks])
|
||||
return result
|
||||
res = await executor.invoke({ input }, { callbacks: [loggerHandler, ...callbacks] })
|
||||
}
|
||||
|
||||
await memory.addChatMessages(
|
||||
[
|
||||
{
|
||||
text: input,
|
||||
type: 'userMessage'
|
||||
},
|
||||
{
|
||||
text: res?.output,
|
||||
type: 'apiMessage'
|
||||
}
|
||||
],
|
||||
this.sessionId
|
||||
)
|
||||
|
||||
return res?.output
|
||||
}
|
||||
}
|
||||
|
||||
const formatAgentSteps = (steps: AgentStep[]): BaseMessage[] =>
|
||||
steps.flatMap(({ action, observation }) => {
|
||||
if ('messageLog' in action && action.messageLog !== undefined) {
|
||||
const log = action.messageLog as BaseMessage[]
|
||||
return log.concat(new FunctionMessage(observation, action.tool))
|
||||
} else {
|
||||
return [new AIMessage(action.log)]
|
||||
}
|
||||
})
|
||||
|
||||
const prepareAgent = (nodeData: INodeData, sessionId?: string) => {
|
||||
const model = nodeData.inputs?.model as ChatOpenAI
|
||||
const memory = nodeData.inputs?.memory
|
||||
const systemMessage = nodeData.inputs?.systemMessage as string
|
||||
let tools = nodeData.inputs?.tools
|
||||
tools = flatten(tools)
|
||||
const memoryKey = memory.memoryKey ?? 'chat_history'
|
||||
|
||||
const prompt = ChatPromptTemplate.fromMessages([
|
||||
['ai', systemMessage ?? `You are a helpful AI assistant.`],
|
||||
new MessagesPlaceholder(memoryKey),
|
||||
['human', '{input}'],
|
||||
new MessagesPlaceholder('agent_scratchpad')
|
||||
])
|
||||
|
||||
const modelWithFunctions = model.bind({
|
||||
functions: [...tools.map((tool: any) => formatToOpenAIFunction(tool))]
|
||||
})
|
||||
|
||||
const runnableAgent = RunnableSequence.from([
|
||||
{
|
||||
input: (i: { input: string; steps: AgentStep[] }) => i.input,
|
||||
agent_scratchpad: (i: { input: string; steps: AgentStep[] }) => formatAgentSteps(i.steps),
|
||||
[memoryKey]: async (_: { input: string; steps: AgentStep[] }) => {
|
||||
const messages: BaseMessage[] = await memory.getChatMessages(sessionId, true)
|
||||
return messages ?? []
|
||||
}
|
||||
},
|
||||
prompt,
|
||||
modelWithFunctions,
|
||||
new OpenAIFunctionsAgentOutputParser()
|
||||
])
|
||||
|
||||
const executor = AgentExecutorExtended.fromAgentAndTools({
|
||||
agent: runnableAgent,
|
||||
tools,
|
||||
sessionId
|
||||
})
|
||||
|
||||
return executor
|
||||
}
|
||||
|
||||
type AgentExecutorOutput = ChainValues
|
||||
|
||||
class AgentExecutorExtended extends AgentExecutor {
|
||||
sessionId?: string
|
||||
|
||||
static fromAgentAndTools(fields: AgentExecutorInput & { sessionId?: string }): AgentExecutorExtended {
|
||||
const newInstance = new AgentExecutorExtended(fields)
|
||||
if (fields.sessionId) newInstance.sessionId = fields.sessionId
|
||||
return newInstance
|
||||
}
|
||||
|
||||
shouldContinueIteration(iterations: number): boolean {
|
||||
return this.maxIterations === undefined || iterations < this.maxIterations
|
||||
}
|
||||
|
||||
async _call(inputs: ChainValues, runManager?: CallbackManagerForChainRun): Promise<AgentExecutorOutput> {
|
||||
const toolsByName = Object.fromEntries(this.tools.map((t) => [t.name.toLowerCase(), t]))
|
||||
|
||||
const steps: AgentStep[] = []
|
||||
let iterations = 0
|
||||
|
||||
const getOutput = async (finishStep: AgentFinish): Promise<AgentExecutorOutput> => {
|
||||
const { returnValues } = finishStep
|
||||
const additional = await this.agent.prepareForOutput(returnValues, steps)
|
||||
|
||||
if (this.returnIntermediateSteps) {
|
||||
return { ...returnValues, intermediateSteps: steps, ...additional }
|
||||
}
|
||||
await runManager?.handleAgentEnd(finishStep)
|
||||
return { ...returnValues, ...additional }
|
||||
}
|
||||
|
||||
while (this.shouldContinueIteration(iterations)) {
|
||||
let output
|
||||
try {
|
||||
output = await this.agent.plan(steps, inputs, runManager?.getChild())
|
||||
} catch (e) {
|
||||
if (e instanceof OutputParserException) {
|
||||
let observation
|
||||
let text = e.message
|
||||
if (this.handleParsingErrors === true) {
|
||||
if (e.sendToLLM) {
|
||||
observation = e.observation
|
||||
text = e.llmOutput ?? ''
|
||||
} else {
|
||||
observation = 'Invalid or incomplete response'
|
||||
}
|
||||
} else if (typeof this.handleParsingErrors === 'string') {
|
||||
observation = this.handleParsingErrors
|
||||
} else if (typeof this.handleParsingErrors === 'function') {
|
||||
observation = this.handleParsingErrors(e)
|
||||
} else {
|
||||
throw e
|
||||
}
|
||||
output = {
|
||||
tool: '_Exception',
|
||||
toolInput: observation,
|
||||
log: text
|
||||
} as AgentAction
|
||||
} else {
|
||||
throw e
|
||||
}
|
||||
}
|
||||
// Check if the agent has finished
|
||||
if ('returnValues' in output) {
|
||||
return getOutput(output)
|
||||
}
|
||||
|
||||
let actions: AgentAction[]
|
||||
if (Array.isArray(output)) {
|
||||
actions = output as AgentAction[]
|
||||
} else {
|
||||
actions = [output as AgentAction]
|
||||
}
|
||||
|
||||
const newSteps = await Promise.all(
|
||||
actions.map(async (action) => {
|
||||
await runManager?.handleAgentAction(action)
|
||||
const tool = action.tool === '_Exception' ? new ExceptionTool() : toolsByName[action.tool?.toLowerCase()]
|
||||
let observation
|
||||
try {
|
||||
// here we need to override Tool call method to include sessionId as parameter
|
||||
observation = tool
|
||||
? // @ts-ignore
|
||||
await tool.call(action.toolInput, runManager?.getChild(), undefined, this.sessionId)
|
||||
: `${action.tool} is not a valid tool, try another one.`
|
||||
} catch (e) {
|
||||
if (e instanceof ToolInputParsingException) {
|
||||
if (this.handleParsingErrors === true) {
|
||||
observation = 'Invalid or incomplete tool input. Please try again.'
|
||||
} else if (typeof this.handleParsingErrors === 'string') {
|
||||
observation = this.handleParsingErrors
|
||||
} else if (typeof this.handleParsingErrors === 'function') {
|
||||
observation = this.handleParsingErrors(e)
|
||||
} else {
|
||||
throw e
|
||||
}
|
||||
observation = await new ExceptionTool().call(observation, runManager?.getChild())
|
||||
return { action, observation: observation ?? '' }
|
||||
}
|
||||
}
|
||||
return { action, observation: observation ?? '' }
|
||||
})
|
||||
)
|
||||
|
||||
steps.push(...newSteps)
|
||||
|
||||
const lastStep = steps[steps.length - 1]
|
||||
const lastTool = toolsByName[lastStep.action.tool?.toLowerCase()]
|
||||
|
||||
if (lastTool?.returnDirect) {
|
||||
return getOutput({
|
||||
returnValues: { [this.agent.returnValues[0]]: lastStep.observation },
|
||||
log: ''
|
||||
})
|
||||
}
|
||||
|
||||
iterations += 1
|
||||
}
|
||||
|
||||
const finish = await this.agent.returnStoppedResponse(this.earlyStoppingMethod, steps, inputs)
|
||||
|
||||
return getOutput(finish)
|
||||
}
|
||||
|
||||
async _takeNextStep(
|
||||
nameToolMap: Record<string, Tool>,
|
||||
inputs: ChainValues,
|
||||
intermediateSteps: AgentStep[],
|
||||
runManager?: CallbackManagerForChainRun
|
||||
): Promise<AgentFinish | AgentStep[]> {
|
||||
let output
|
||||
try {
|
||||
output = await this.agent.plan(intermediateSteps, inputs, runManager?.getChild())
|
||||
} catch (e) {
|
||||
if (e instanceof OutputParserException) {
|
||||
let observation
|
||||
let text = e.message
|
||||
if (this.handleParsingErrors === true) {
|
||||
if (e.sendToLLM) {
|
||||
observation = e.observation
|
||||
text = e.llmOutput ?? ''
|
||||
} else {
|
||||
observation = 'Invalid or incomplete response'
|
||||
}
|
||||
} else if (typeof this.handleParsingErrors === 'string') {
|
||||
observation = this.handleParsingErrors
|
||||
} else if (typeof this.handleParsingErrors === 'function') {
|
||||
observation = this.handleParsingErrors(e)
|
||||
} else {
|
||||
throw e
|
||||
}
|
||||
output = {
|
||||
tool: '_Exception',
|
||||
toolInput: observation,
|
||||
log: text
|
||||
} as AgentAction
|
||||
} else {
|
||||
throw e
|
||||
}
|
||||
}
|
||||
|
||||
if ('returnValues' in output) {
|
||||
return output
|
||||
}
|
||||
|
||||
let actions: AgentAction[]
|
||||
if (Array.isArray(output)) {
|
||||
actions = output as AgentAction[]
|
||||
} else {
|
||||
actions = [output as AgentAction]
|
||||
}
|
||||
|
||||
const result: AgentStep[] = []
|
||||
for (const agentAction of actions) {
|
||||
let observation = ''
|
||||
if (runManager) {
|
||||
await runManager?.handleAgentAction(agentAction)
|
||||
}
|
||||
if (agentAction.tool in nameToolMap) {
|
||||
const tool = nameToolMap[agentAction.tool]
|
||||
try {
|
||||
// here we need to override Tool call method to include sessionId as parameter
|
||||
// @ts-ignore
|
||||
observation = await tool.call(agentAction.toolInput, runManager?.getChild(), undefined, this.sessionId)
|
||||
} catch (e) {
|
||||
if (e instanceof ToolInputParsingException) {
|
||||
if (this.handleParsingErrors === true) {
|
||||
observation = 'Invalid or incomplete tool input. Please try again.'
|
||||
} else if (typeof this.handleParsingErrors === 'string') {
|
||||
observation = this.handleParsingErrors
|
||||
} else if (typeof this.handleParsingErrors === 'function') {
|
||||
observation = this.handleParsingErrors(e)
|
||||
} else {
|
||||
throw e
|
||||
}
|
||||
observation = await new ExceptionTool().call(observation, runManager?.getChild())
|
||||
}
|
||||
}
|
||||
} else {
|
||||
observation = `${agentAction.tool} is not a valid tool, try another available tool: ${Object.keys(nameToolMap).join(', ')}`
|
||||
}
|
||||
result.push({
|
||||
action: agentAction,
|
||||
observation
|
||||
})
|
||||
}
|
||||
return result
|
||||
}
|
||||
}
|
||||
|
||||
class ExceptionTool extends Tool {
|
||||
name = '_Exception'
|
||||
|
||||
description = 'Exception tool'
|
||||
|
||||
async _call(query: string) {
|
||||
return query
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user