From 4256655c7bace928ff4bc8e12ebe81696c9304f1 Mon Sep 17 00:00:00 2001 From: Henry Date: Wed, 17 Jan 2024 23:47:11 +0000 Subject: [PATCH] add $vars and $flow to custom function --- .../nodes/tools/CustomTool/CustomTool.ts | 20 +----- .../components/nodes/tools/CustomTool/core.ts | 39 +--------- .../CustomFunction/CustomFunction.ts | 35 +++++---- .../IfElseFunction/IfElseFunction.ts | 35 +++++---- packages/components/src/Interface.ts | 6 ++ packages/components/src/utils.ts | 72 ++++++++++++++++++- packages/server/src/index.ts | 27 +++++-- packages/server/src/utils/index.ts | 3 + 8 files changed, 136 insertions(+), 101 deletions(-) diff --git a/packages/components/nodes/tools/CustomTool/CustomTool.ts b/packages/components/nodes/tools/CustomTool/CustomTool.ts index a983d0d9..6ba5bc26 100644 --- a/packages/components/nodes/tools/CustomTool/CustomTool.ts +++ b/packages/components/nodes/tools/CustomTool/CustomTool.ts @@ -1,5 +1,5 @@ import { ICommonObject, IDatabaseEntity, INode, INodeData, INodeOptionsValue, INodeParams } from '../../../src/Interface' -import { convertSchemaToZod, getBaseClasses } from '../../../src/utils' +import { convertSchemaToZod, getBaseClasses, getVars } from '../../../src/utils' import { DynamicStructuredTool } from './core' import { z } from 'zod' import { DataSource } from 'typeorm' @@ -81,23 +81,7 @@ class CustomTool_Tools implements INode { } if (customToolFunc) obj.code = customToolFunc - const variables = await appDataSource.getRepository(databaseEntities['Variable']).find() - - // override variables defined in overrideConfig - // nodeData.inputs.variables is an Object, check each property and override the variable - if (nodeData?.inputs?.vars) { - for (const propertyName of Object.getOwnPropertyNames(nodeData.inputs.vars)) { - const foundVar = variables.find((v) => v.name === propertyName) - if (foundVar) { - // even if the variable was defined as runtime, we override it with static value - foundVar.type = 'static' - foundVar.value = nodeData.inputs.vars[propertyName] - } else { - // add it the variables, if not found locally in the db - variables.push({ name: propertyName, type: 'static', value: nodeData.inputs.vars[propertyName] }) - } - } - } + const variables = await getVars(appDataSource, databaseEntities, nodeData) const flow = { chatflowId: options.chatflowid } diff --git a/packages/components/nodes/tools/CustomTool/core.ts b/packages/components/nodes/tools/CustomTool/core.ts index b543aefa..19be88f1 100644 --- a/packages/components/nodes/tools/CustomTool/core.ts +++ b/packages/components/nodes/tools/CustomTool/core.ts @@ -1,6 +1,6 @@ import { z } from 'zod' import { NodeVM } from 'vm2' -import { availableDependencies } from '../../../src/utils' +import { availableDependencies, defaultAllowBuiltInDep, prepareSandboxVars } from '../../../src/utils' import { RunnableConfig } from '@langchain/core/runnables' import { StructuredTool, ToolParams } from '@langchain/core/tools' import { CallbackManagerForToolRun, Callbacks, CallbackManager, parseCallbackConfigArg } from '@langchain/core/callbacks/manager' @@ -112,48 +112,13 @@ export class DynamicStructuredTool< } } - // inject variables - let vars = {} - if (this.variables) { - for (const item of this.variables) { - let value = item.value - - // read from .env file - if (item.type === 'runtime') { - value = process.env[item.name] - } - - Object.defineProperty(vars, item.name, { - enumerable: true, - configurable: true, - writable: true, - value: value - }) - } - } - sandbox['$vars'] = vars + sandbox['$vars'] = prepareSandboxVars(this.variables) // inject flow properties if (this.flowObj) { sandbox['$flow'] = { ...this.flowObj, ...flowConfig } } - const defaultAllowBuiltInDep = [ - 'assert', - 'buffer', - 'crypto', - 'events', - 'http', - 'https', - 'net', - 'path', - 'querystring', - 'timers', - 'tls', - 'url', - 'zlib' - ] - const builtinDeps = process.env.TOOL_FUNCTION_BUILTIN_DEP ? defaultAllowBuiltInDep.concat(process.env.TOOL_FUNCTION_BUILTIN_DEP.split(',')) : defaultAllowBuiltInDep diff --git a/packages/components/nodes/utilities/CustomFunction/CustomFunction.ts b/packages/components/nodes/utilities/CustomFunction/CustomFunction.ts index 749c3a86..ff29d589 100644 --- a/packages/components/nodes/utilities/CustomFunction/CustomFunction.ts +++ b/packages/components/nodes/utilities/CustomFunction/CustomFunction.ts @@ -1,6 +1,7 @@ -import { ICommonObject, INode, INodeData, INodeOutputsValue, INodeParams } from '../../../src/Interface' +import { ICommonObject, IDatabaseEntity, INode, INodeData, INodeOutputsValue, INodeParams } from '../../../src/Interface' import { NodeVM } from 'vm2' -import { availableDependencies, handleEscapeCharacters } from '../../../src/utils' +import { DataSource } from 'typeorm' +import { availableDependencies, defaultAllowBuiltInDep, getVars, handleEscapeCharacters, prepareSandboxVars } from '../../../src/utils' class CustomFunction_Utilities implements INode { label: string @@ -55,9 +56,19 @@ class CustomFunction_Utilities implements INode { ] } - async init(nodeData: INodeData, input: string): Promise { + async init(nodeData: INodeData, input: string, options: ICommonObject): Promise { const javascriptFunction = nodeData.inputs?.javascriptFunction as string const functionInputVariablesRaw = nodeData.inputs?.functionInputVariables + const appDataSource = options.appDataSource as DataSource + const databaseEntities = options.databaseEntities as IDatabaseEntity + + const variables = await getVars(appDataSource, databaseEntities, nodeData) + const flow = { + chatflowId: options.chatflowid, + sessionId: options.sessionId, + chatId: options.chatId, + input + } let inputVars: ICommonObject = {} if (functionInputVariablesRaw) { @@ -70,6 +81,8 @@ class CustomFunction_Utilities implements INode { } let sandbox: any = { $input: input } + sandbox['$vars'] = prepareSandboxVars(variables) + sandbox['$flow'] = flow if (Object.keys(inputVars).length) { for (const item in inputVars) { @@ -81,22 +94,6 @@ class CustomFunction_Utilities implements INode { } } - const defaultAllowBuiltInDep = [ - 'assert', - 'buffer', - 'crypto', - 'events', - 'http', - 'https', - 'net', - 'path', - 'querystring', - 'timers', - 'tls', - 'url', - 'zlib' - ] - const builtinDeps = process.env.TOOL_FUNCTION_BUILTIN_DEP ? defaultAllowBuiltInDep.concat(process.env.TOOL_FUNCTION_BUILTIN_DEP.split(',')) : defaultAllowBuiltInDep diff --git a/packages/components/nodes/utilities/IfElseFunction/IfElseFunction.ts b/packages/components/nodes/utilities/IfElseFunction/IfElseFunction.ts index 862521eb..55339e1a 100644 --- a/packages/components/nodes/utilities/IfElseFunction/IfElseFunction.ts +++ b/packages/components/nodes/utilities/IfElseFunction/IfElseFunction.ts @@ -1,6 +1,7 @@ -import { ICommonObject, INode, INodeData, INodeOutputsValue, INodeParams } from '../../../src/Interface' +import { ICommonObject, IDatabaseEntity, INode, INodeData, INodeOutputsValue, INodeParams } from '../../../src/Interface' import { NodeVM } from 'vm2' -import { availableDependencies } from '../../../src/utils' +import { DataSource } from 'typeorm' +import { availableDependencies, defaultAllowBuiltInDep, getVars, prepareSandboxVars } from '../../../src/utils' class IfElseFunction_Utilities implements INode { label: string @@ -73,10 +74,20 @@ class IfElseFunction_Utilities implements INode { ] } - async init(nodeData: INodeData, input: string): Promise { + async init(nodeData: INodeData, input: string, options: ICommonObject): Promise { const ifFunction = nodeData.inputs?.ifFunction as string const elseFunction = nodeData.inputs?.elseFunction as string const functionInputVariablesRaw = nodeData.inputs?.functionInputVariables + const appDataSource = options.appDataSource as DataSource + const databaseEntities = options.databaseEntities as IDatabaseEntity + + const variables = await getVars(appDataSource, databaseEntities, nodeData) + const flow = { + chatflowId: options.chatflowid, + sessionId: options.sessionId, + chatId: options.chatId, + input + } let inputVars: ICommonObject = {} if (functionInputVariablesRaw) { @@ -89,6 +100,8 @@ class IfElseFunction_Utilities implements INode { } let sandbox: any = { $input: input } + sandbox['$vars'] = prepareSandboxVars(variables) + sandbox['$flow'] = flow if (Object.keys(inputVars).length) { for (const item in inputVars) { @@ -96,22 +109,6 @@ class IfElseFunction_Utilities implements INode { } } - const defaultAllowBuiltInDep = [ - 'assert', - 'buffer', - 'crypto', - 'events', - 'http', - 'https', - 'net', - 'path', - 'querystring', - 'timers', - 'tls', - 'url', - 'zlib' - ] - const builtinDeps = process.env.TOOL_FUNCTION_BUILTIN_DEP ? defaultAllowBuiltInDep.concat(process.env.TOOL_FUNCTION_BUILTIN_DEP.split(',')) : defaultAllowBuiltInDep diff --git a/packages/components/src/Interface.ts b/packages/components/src/Interface.ts index d74ba1b4..fe08f070 100644 --- a/packages/components/src/Interface.ts +++ b/packages/components/src/Interface.ts @@ -29,6 +29,12 @@ export interface ICommonObject { [key: string]: any | CommonType | ICommonObject | CommonType[] | ICommonObject[] } +export interface IVariable { + name: string + value: string + type: string +} + export type IDatabaseEntity = { [key: string]: any } diff --git a/packages/components/src/utils.ts b/packages/components/src/utils.ts index 2215eb41..7e9a68eb 100644 --- a/packages/components/src/utils.ts +++ b/packages/components/src/utils.ts @@ -5,7 +5,7 @@ import * as path from 'path' import { JSDOM } from 'jsdom' import { z } from 'zod' import { DataSource } from 'typeorm' -import { ICommonObject, IDatabaseEntity, IMessage, INodeData } from './Interface' +import { ICommonObject, IDatabaseEntity, IMessage, INodeData, IVariable } from './Interface' import { AES, enc } from 'crypto-js' import { ChatMessageHistory } from 'langchain/memory' import { AIMessage, HumanMessage, BaseMessage } from 'langchain/schema' @@ -70,6 +70,22 @@ export const availableDependencies = [ 'weaviate-ts-client' ] +export const defaultAllowBuiltInDep = [ + 'assert', + 'buffer', + 'crypto', + 'events', + 'http', + 'https', + 'net', + 'path', + 'querystring', + 'timers', + 'tls', + 'url', + 'zlib' +] + /** * Get base classes of components * @@ -688,3 +704,57 @@ export const convertMultiOptionsToStringArray = (inputString: string): string[] } return ArrayString } + +/** + * Get variables + * @param {DataSource} appDataSource + * @param {IDatabaseEntity} databaseEntities + * @param {INodeData} nodeData + */ +export const getVars = async (appDataSource: DataSource, databaseEntities: IDatabaseEntity, nodeData: INodeData) => { + const variables = ((await appDataSource.getRepository(databaseEntities['Variable']).find()) as IVariable[]) ?? [] + + // override variables defined in overrideConfig + // nodeData.inputs.variables is an Object, check each property and override the variable + if (nodeData?.inputs?.vars) { + for (const propertyName of Object.getOwnPropertyNames(nodeData.inputs.vars)) { + const foundVar = variables.find((v) => v.name === propertyName) + if (foundVar) { + // even if the variable was defined as runtime, we override it with static value + foundVar.type = 'static' + foundVar.value = nodeData.inputs.vars[propertyName] + } else { + // add it the variables, if not found locally in the db + variables.push({ name: propertyName, type: 'static', value: nodeData.inputs.vars[propertyName] }) + } + } + } + + return variables +} + +/** + * Prepare sandbox variables + * @param {IVariable[]} variables + */ +export const prepareSandboxVars = (variables: IVariable[]) => { + let vars = {} + if (variables) { + for (const item of variables) { + let value = item.value + + // read from .env file + if (item.type === 'runtime') { + value = process.env[item.name] ?? '' + } + + Object.defineProperty(vars, item.name, { + enumerable: true, + configurable: true, + writable: true, + value: value + }) + } + } + return vars +} diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index 94a3b538..1986d207 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -294,7 +294,13 @@ export class App { const nodeModule = await import(nodeInstanceFilePath) const newNodeInstance = new nodeModule.nodeClass() - const returnData = await newNodeInstance.init(nodeData) + const options: ICommonObject = { + appDataSource: this.AppDataSource, + databaseEntities, + logger + } + + const returnData = await newNodeInstance.init(nodeData, '', options) const result = typeof returnData === 'string' ? handleEscapeCharacters(returnData, true) : returnData return res.json(result) @@ -1448,6 +1454,11 @@ export class App { let chatId = incomingInput.chatId ?? '' let isUpsert = true + // Get session ID + const memoryNode = findMemoryNode(nodes, edges) + let sessionId = undefined + if (memoryNode) sessionId = getMemorySessionId(memoryNode, incomingInput, chatId, isInternal) + const vsNodes = nodes.filter( (node) => node.data.category === 'Vector Stores' && @@ -1485,6 +1496,7 @@ export class App { incomingInput.question, chatHistory, chatId, + sessionId ?? '', chatflowid, this.AppDataSource, incomingInput?.overrideConfig, @@ -1562,6 +1574,12 @@ export class App { const nodes = parsedFlowData.nodes const edges = parsedFlowData.edges + // Get session ID + const memoryNode = findMemoryNode(nodes, edges) + const memoryType = memoryNode?.data.label + let sessionId = undefined + if (memoryNode) sessionId = getMemorySessionId(memoryNode, incomingInput, chatId, isInternal) + /* Reuse the flow without having to rebuild (to avoid duplicated upsert, recomputation, reinitialization of memory) when all these conditions met: * - Node Data already exists in pool * - Still in sync (i.e the flow has not been modified since) @@ -1671,6 +1689,7 @@ export class App { incomingInput.question, chatHistory, chatId, + sessionId ?? '', chatflowid, this.AppDataSource, incomingInput?.overrideConfig, @@ -1700,12 +1719,6 @@ export class App { logger.debug(`[server]: Running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`) - const memoryNode = findMemoryNode(nodes, edges) - const memoryType = memoryNode?.data.label - - let sessionId = undefined - if (memoryNode) sessionId = getMemorySessionId(memoryNode, incomingInput, chatId, isInternal) - const nodeInstanceFilePath = this.nodesPool.componentNodes[nodeToExecuteData.name].filePath as string const nodeModule = await import(nodeInstanceFilePath) const nodeInstance = new nodeModule.nodeClass({ sessionId }) diff --git a/packages/server/src/utils/index.ts b/packages/server/src/utils/index.ts index dafe612c..2d6acf58 100644 --- a/packages/server/src/utils/index.ts +++ b/packages/server/src/utils/index.ts @@ -273,6 +273,7 @@ export const buildLangchain = async ( question: string, chatHistory: IMessage[], chatId: string, + sessionId: string, chatflowid: string, appDataSource: DataSource, overrideConfig?: ICommonObject, @@ -317,6 +318,7 @@ export const buildLangchain = async ( logger.debug(`[server]: Upserting ${reactFlowNode.data.label} (${reactFlowNode.data.id})`) await newNodeInstance.vectorStoreMethods!['upsert']!.call(newNodeInstance, reactFlowNodeData, { chatId, + sessionId, chatflowid, chatHistory, logger, @@ -331,6 +333,7 @@ export const buildLangchain = async ( logger.debug(`[server]: Initializing ${reactFlowNode.data.label} (${reactFlowNode.data.id})`) let outputResult = await newNodeInstance.init(reactFlowNodeData, question, { chatId, + sessionId, chatflowid, chatHistory, logger,