mirror of
https://github.com/farcasclaudiu/Flowise.git
synced 2026-06-23 15:00:29 +03:00
Merge pull request #1558 from FlowiseAI/feature/Vars-&-Flows-to-CustomFunction
Feature/add $vars and $flow to custom function
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
import { ICommonObject, IDatabaseEntity, INode, INodeData, INodeOptionsValue, INodeParams } from '../../../src/Interface'
|
||||
import { convertSchemaToZod, getBaseClasses } from '../../../src/utils'
|
||||
import { convertSchemaToZod, getBaseClasses, getVars } from '../../../src/utils'
|
||||
import { DynamicStructuredTool } from './core'
|
||||
import { z } from 'zod'
|
||||
import { DataSource } from 'typeorm'
|
||||
@@ -81,23 +81,7 @@ class CustomTool_Tools implements INode {
|
||||
}
|
||||
if (customToolFunc) obj.code = customToolFunc
|
||||
|
||||
const variables = await appDataSource.getRepository(databaseEntities['Variable']).find()
|
||||
|
||||
// override variables defined in overrideConfig
|
||||
// nodeData.inputs.variables is an Object, check each property and override the variable
|
||||
if (nodeData?.inputs?.vars) {
|
||||
for (const propertyName of Object.getOwnPropertyNames(nodeData.inputs.vars)) {
|
||||
const foundVar = variables.find((v) => v.name === propertyName)
|
||||
if (foundVar) {
|
||||
// even if the variable was defined as runtime, we override it with static value
|
||||
foundVar.type = 'static'
|
||||
foundVar.value = nodeData.inputs.vars[propertyName]
|
||||
} else {
|
||||
// add it the variables, if not found locally in the db
|
||||
variables.push({ name: propertyName, type: 'static', value: nodeData.inputs.vars[propertyName] })
|
||||
}
|
||||
}
|
||||
}
|
||||
const variables = await getVars(appDataSource, databaseEntities, nodeData)
|
||||
|
||||
const flow = { chatflowId: options.chatflowid }
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { z } from 'zod'
|
||||
import { NodeVM } from 'vm2'
|
||||
import { availableDependencies } from '../../../src/utils'
|
||||
import { availableDependencies, defaultAllowBuiltInDep, prepareSandboxVars } from '../../../src/utils'
|
||||
import { RunnableConfig } from '@langchain/core/runnables'
|
||||
import { StructuredTool, ToolParams } from '@langchain/core/tools'
|
||||
import { CallbackManagerForToolRun, Callbacks, CallbackManager, parseCallbackConfigArg } from '@langchain/core/callbacks/manager'
|
||||
@@ -112,48 +112,13 @@ export class DynamicStructuredTool<
|
||||
}
|
||||
}
|
||||
|
||||
// inject variables
|
||||
let vars = {}
|
||||
if (this.variables) {
|
||||
for (const item of this.variables) {
|
||||
let value = item.value
|
||||
|
||||
// read from .env file
|
||||
if (item.type === 'runtime') {
|
||||
value = process.env[item.name]
|
||||
}
|
||||
|
||||
Object.defineProperty(vars, item.name, {
|
||||
enumerable: true,
|
||||
configurable: true,
|
||||
writable: true,
|
||||
value: value
|
||||
})
|
||||
}
|
||||
}
|
||||
sandbox['$vars'] = vars
|
||||
sandbox['$vars'] = prepareSandboxVars(this.variables)
|
||||
|
||||
// inject flow properties
|
||||
if (this.flowObj) {
|
||||
sandbox['$flow'] = { ...this.flowObj, ...flowConfig }
|
||||
}
|
||||
|
||||
const defaultAllowBuiltInDep = [
|
||||
'assert',
|
||||
'buffer',
|
||||
'crypto',
|
||||
'events',
|
||||
'http',
|
||||
'https',
|
||||
'net',
|
||||
'path',
|
||||
'querystring',
|
||||
'timers',
|
||||
'tls',
|
||||
'url',
|
||||
'zlib'
|
||||
]
|
||||
|
||||
const builtinDeps = process.env.TOOL_FUNCTION_BUILTIN_DEP
|
||||
? defaultAllowBuiltInDep.concat(process.env.TOOL_FUNCTION_BUILTIN_DEP.split(','))
|
||||
: defaultAllowBuiltInDep
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { ICommonObject, INode, INodeData, INodeOutputsValue, INodeParams } from '../../../src/Interface'
|
||||
import { ICommonObject, IDatabaseEntity, INode, INodeData, INodeOutputsValue, INodeParams } from '../../../src/Interface'
|
||||
import { NodeVM } from 'vm2'
|
||||
import { availableDependencies, handleEscapeCharacters } from '../../../src/utils'
|
||||
import { DataSource } from 'typeorm'
|
||||
import { availableDependencies, defaultAllowBuiltInDep, getVars, handleEscapeCharacters, prepareSandboxVars } from '../../../src/utils'
|
||||
|
||||
class CustomFunction_Utilities implements INode {
|
||||
label: string
|
||||
@@ -55,9 +56,19 @@ class CustomFunction_Utilities implements INode {
|
||||
]
|
||||
}
|
||||
|
||||
async init(nodeData: INodeData, input: string): Promise<any> {
|
||||
async init(nodeData: INodeData, input: string, options: ICommonObject): Promise<any> {
|
||||
const javascriptFunction = nodeData.inputs?.javascriptFunction as string
|
||||
const functionInputVariablesRaw = nodeData.inputs?.functionInputVariables
|
||||
const appDataSource = options.appDataSource as DataSource
|
||||
const databaseEntities = options.databaseEntities as IDatabaseEntity
|
||||
|
||||
const variables = await getVars(appDataSource, databaseEntities, nodeData)
|
||||
const flow = {
|
||||
chatflowId: options.chatflowid,
|
||||
sessionId: options.sessionId,
|
||||
chatId: options.chatId,
|
||||
input
|
||||
}
|
||||
|
||||
let inputVars: ICommonObject = {}
|
||||
if (functionInputVariablesRaw) {
|
||||
@@ -70,6 +81,8 @@ class CustomFunction_Utilities implements INode {
|
||||
}
|
||||
|
||||
let sandbox: any = { $input: input }
|
||||
sandbox['$vars'] = prepareSandboxVars(variables)
|
||||
sandbox['$flow'] = flow
|
||||
|
||||
if (Object.keys(inputVars).length) {
|
||||
for (const item in inputVars) {
|
||||
@@ -81,22 +94,6 @@ class CustomFunction_Utilities implements INode {
|
||||
}
|
||||
}
|
||||
|
||||
const defaultAllowBuiltInDep = [
|
||||
'assert',
|
||||
'buffer',
|
||||
'crypto',
|
||||
'events',
|
||||
'http',
|
||||
'https',
|
||||
'net',
|
||||
'path',
|
||||
'querystring',
|
||||
'timers',
|
||||
'tls',
|
||||
'url',
|
||||
'zlib'
|
||||
]
|
||||
|
||||
const builtinDeps = process.env.TOOL_FUNCTION_BUILTIN_DEP
|
||||
? defaultAllowBuiltInDep.concat(process.env.TOOL_FUNCTION_BUILTIN_DEP.split(','))
|
||||
: defaultAllowBuiltInDep
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { ICommonObject, INode, INodeData, INodeOutputsValue, INodeParams } from '../../../src/Interface'
|
||||
import { ICommonObject, IDatabaseEntity, INode, INodeData, INodeOutputsValue, INodeParams } from '../../../src/Interface'
|
||||
import { NodeVM } from 'vm2'
|
||||
import { availableDependencies } from '../../../src/utils'
|
||||
import { DataSource } from 'typeorm'
|
||||
import { availableDependencies, defaultAllowBuiltInDep, getVars, prepareSandboxVars } from '../../../src/utils'
|
||||
|
||||
class IfElseFunction_Utilities implements INode {
|
||||
label: string
|
||||
@@ -73,10 +74,20 @@ class IfElseFunction_Utilities implements INode {
|
||||
]
|
||||
}
|
||||
|
||||
async init(nodeData: INodeData, input: string): Promise<any> {
|
||||
async init(nodeData: INodeData, input: string, options: ICommonObject): Promise<any> {
|
||||
const ifFunction = nodeData.inputs?.ifFunction as string
|
||||
const elseFunction = nodeData.inputs?.elseFunction as string
|
||||
const functionInputVariablesRaw = nodeData.inputs?.functionInputVariables
|
||||
const appDataSource = options.appDataSource as DataSource
|
||||
const databaseEntities = options.databaseEntities as IDatabaseEntity
|
||||
|
||||
const variables = await getVars(appDataSource, databaseEntities, nodeData)
|
||||
const flow = {
|
||||
chatflowId: options.chatflowid,
|
||||
sessionId: options.sessionId,
|
||||
chatId: options.chatId,
|
||||
input
|
||||
}
|
||||
|
||||
let inputVars: ICommonObject = {}
|
||||
if (functionInputVariablesRaw) {
|
||||
@@ -89,6 +100,8 @@ class IfElseFunction_Utilities implements INode {
|
||||
}
|
||||
|
||||
let sandbox: any = { $input: input }
|
||||
sandbox['$vars'] = prepareSandboxVars(variables)
|
||||
sandbox['$flow'] = flow
|
||||
|
||||
if (Object.keys(inputVars).length) {
|
||||
for (const item in inputVars) {
|
||||
@@ -96,22 +109,6 @@ class IfElseFunction_Utilities implements INode {
|
||||
}
|
||||
}
|
||||
|
||||
const defaultAllowBuiltInDep = [
|
||||
'assert',
|
||||
'buffer',
|
||||
'crypto',
|
||||
'events',
|
||||
'http',
|
||||
'https',
|
||||
'net',
|
||||
'path',
|
||||
'querystring',
|
||||
'timers',
|
||||
'tls',
|
||||
'url',
|
||||
'zlib'
|
||||
]
|
||||
|
||||
const builtinDeps = process.env.TOOL_FUNCTION_BUILTIN_DEP
|
||||
? defaultAllowBuiltInDep.concat(process.env.TOOL_FUNCTION_BUILTIN_DEP.split(','))
|
||||
: defaultAllowBuiltInDep
|
||||
|
||||
@@ -29,6 +29,12 @@ export interface ICommonObject {
|
||||
[key: string]: any | CommonType | ICommonObject | CommonType[] | ICommonObject[]
|
||||
}
|
||||
|
||||
export interface IVariable {
|
||||
name: string
|
||||
value: string
|
||||
type: string
|
||||
}
|
||||
|
||||
export type IDatabaseEntity = {
|
||||
[key: string]: any
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ import * as path from 'path'
|
||||
import { JSDOM } from 'jsdom'
|
||||
import { z } from 'zod'
|
||||
import { DataSource } from 'typeorm'
|
||||
import { ICommonObject, IDatabaseEntity, IMessage, INodeData } from './Interface'
|
||||
import { ICommonObject, IDatabaseEntity, IMessage, INodeData, IVariable } from './Interface'
|
||||
import { AES, enc } from 'crypto-js'
|
||||
import { ChatMessageHistory } from 'langchain/memory'
|
||||
import { AIMessage, HumanMessage, BaseMessage } from 'langchain/schema'
|
||||
@@ -70,6 +70,22 @@ export const availableDependencies = [
|
||||
'weaviate-ts-client'
|
||||
]
|
||||
|
||||
export const defaultAllowBuiltInDep = [
|
||||
'assert',
|
||||
'buffer',
|
||||
'crypto',
|
||||
'events',
|
||||
'http',
|
||||
'https',
|
||||
'net',
|
||||
'path',
|
||||
'querystring',
|
||||
'timers',
|
||||
'tls',
|
||||
'url',
|
||||
'zlib'
|
||||
]
|
||||
|
||||
/**
|
||||
* Get base classes of components
|
||||
*
|
||||
@@ -688,3 +704,57 @@ export const convertMultiOptionsToStringArray = (inputString: string): string[]
|
||||
}
|
||||
return ArrayString
|
||||
}
|
||||
|
||||
/**
|
||||
* Get variables
|
||||
* @param {DataSource} appDataSource
|
||||
* @param {IDatabaseEntity} databaseEntities
|
||||
* @param {INodeData} nodeData
|
||||
*/
|
||||
export const getVars = async (appDataSource: DataSource, databaseEntities: IDatabaseEntity, nodeData: INodeData) => {
|
||||
const variables = ((await appDataSource.getRepository(databaseEntities['Variable']).find()) as IVariable[]) ?? []
|
||||
|
||||
// override variables defined in overrideConfig
|
||||
// nodeData.inputs.variables is an Object, check each property and override the variable
|
||||
if (nodeData?.inputs?.vars) {
|
||||
for (const propertyName of Object.getOwnPropertyNames(nodeData.inputs.vars)) {
|
||||
const foundVar = variables.find((v) => v.name === propertyName)
|
||||
if (foundVar) {
|
||||
// even if the variable was defined as runtime, we override it with static value
|
||||
foundVar.type = 'static'
|
||||
foundVar.value = nodeData.inputs.vars[propertyName]
|
||||
} else {
|
||||
// add it the variables, if not found locally in the db
|
||||
variables.push({ name: propertyName, type: 'static', value: nodeData.inputs.vars[propertyName] })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return variables
|
||||
}
|
||||
|
||||
/**
|
||||
* Prepare sandbox variables
|
||||
* @param {IVariable[]} variables
|
||||
*/
|
||||
export const prepareSandboxVars = (variables: IVariable[]) => {
|
||||
let vars = {}
|
||||
if (variables) {
|
||||
for (const item of variables) {
|
||||
let value = item.value
|
||||
|
||||
// read from .env file
|
||||
if (item.type === 'runtime') {
|
||||
value = process.env[item.name] ?? ''
|
||||
}
|
||||
|
||||
Object.defineProperty(vars, item.name, {
|
||||
enumerable: true,
|
||||
configurable: true,
|
||||
writable: true,
|
||||
value: value
|
||||
})
|
||||
}
|
||||
}
|
||||
return vars
|
||||
}
|
||||
|
||||
@@ -294,7 +294,13 @@ export class App {
|
||||
const nodeModule = await import(nodeInstanceFilePath)
|
||||
const newNodeInstance = new nodeModule.nodeClass()
|
||||
|
||||
const returnData = await newNodeInstance.init(nodeData)
|
||||
const options: ICommonObject = {
|
||||
appDataSource: this.AppDataSource,
|
||||
databaseEntities,
|
||||
logger
|
||||
}
|
||||
|
||||
const returnData = await newNodeInstance.init(nodeData, '', options)
|
||||
const result = typeof returnData === 'string' ? handleEscapeCharacters(returnData, true) : returnData
|
||||
|
||||
return res.json(result)
|
||||
@@ -1448,6 +1454,11 @@ export class App {
|
||||
let chatId = incomingInput.chatId ?? ''
|
||||
let isUpsert = true
|
||||
|
||||
// Get session ID
|
||||
const memoryNode = findMemoryNode(nodes, edges)
|
||||
let sessionId = undefined
|
||||
if (memoryNode) sessionId = getMemorySessionId(memoryNode, incomingInput, chatId, isInternal)
|
||||
|
||||
const vsNodes = nodes.filter(
|
||||
(node) =>
|
||||
node.data.category === 'Vector Stores' &&
|
||||
@@ -1485,6 +1496,7 @@ export class App {
|
||||
incomingInput.question,
|
||||
chatHistory,
|
||||
chatId,
|
||||
sessionId ?? '',
|
||||
chatflowid,
|
||||
this.AppDataSource,
|
||||
incomingInput?.overrideConfig,
|
||||
@@ -1562,6 +1574,12 @@ export class App {
|
||||
const nodes = parsedFlowData.nodes
|
||||
const edges = parsedFlowData.edges
|
||||
|
||||
// Get session ID
|
||||
const memoryNode = findMemoryNode(nodes, edges)
|
||||
const memoryType = memoryNode?.data.label
|
||||
let sessionId = undefined
|
||||
if (memoryNode) sessionId = getMemorySessionId(memoryNode, incomingInput, chatId, isInternal)
|
||||
|
||||
/* Reuse the flow without having to rebuild (to avoid duplicated upsert, recomputation, reinitialization of memory) when all these conditions met:
|
||||
* - Node Data already exists in pool
|
||||
* - Still in sync (i.e the flow has not been modified since)
|
||||
@@ -1671,6 +1689,7 @@ export class App {
|
||||
incomingInput.question,
|
||||
chatHistory,
|
||||
chatId,
|
||||
sessionId ?? '',
|
||||
chatflowid,
|
||||
this.AppDataSource,
|
||||
incomingInput?.overrideConfig,
|
||||
@@ -1700,12 +1719,6 @@ export class App {
|
||||
|
||||
logger.debug(`[server]: Running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`)
|
||||
|
||||
const memoryNode = findMemoryNode(nodes, edges)
|
||||
const memoryType = memoryNode?.data.label
|
||||
|
||||
let sessionId = undefined
|
||||
if (memoryNode) sessionId = getMemorySessionId(memoryNode, incomingInput, chatId, isInternal)
|
||||
|
||||
const nodeInstanceFilePath = this.nodesPool.componentNodes[nodeToExecuteData.name].filePath as string
|
||||
const nodeModule = await import(nodeInstanceFilePath)
|
||||
const nodeInstance = new nodeModule.nodeClass({ sessionId })
|
||||
|
||||
@@ -273,6 +273,7 @@ export const buildLangchain = async (
|
||||
question: string,
|
||||
chatHistory: IMessage[],
|
||||
chatId: string,
|
||||
sessionId: string,
|
||||
chatflowid: string,
|
||||
appDataSource: DataSource,
|
||||
overrideConfig?: ICommonObject,
|
||||
@@ -317,6 +318,7 @@ export const buildLangchain = async (
|
||||
logger.debug(`[server]: Upserting ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
|
||||
await newNodeInstance.vectorStoreMethods!['upsert']!.call(newNodeInstance, reactFlowNodeData, {
|
||||
chatId,
|
||||
sessionId,
|
||||
chatflowid,
|
||||
chatHistory,
|
||||
logger,
|
||||
@@ -331,6 +333,7 @@ export const buildLangchain = async (
|
||||
logger.debug(`[server]: Initializing ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
|
||||
let outputResult = await newNodeInstance.init(reactFlowNodeData, question, {
|
||||
chatId,
|
||||
sessionId,
|
||||
chatflowid,
|
||||
chatHistory,
|
||||
logger,
|
||||
|
||||
Reference in New Issue
Block a user