Add more nodes for agents, loaders

This commit is contained in:
Henry
2023-04-10 13:56:44 +01:00
parent 05c86ff9c5
commit 58e06718d1
57 changed files with 1584 additions and 89 deletions
+43
View File
@@ -0,0 +1,43 @@
import { INodeData } from 'flowise-components'
import { IActiveChatflows } from './Interface'
/**
* This pool is to keep track of active test triggers (event listeners),
* so we can clear the event listeners whenever user refresh or exit page
*/
export class ChatflowPool {
activeChatflows: IActiveChatflows = {}
/**
* Add to the pool
* @param {string} chatflowid
* @param {INodeData} endingNodeData
*/
add(chatflowid: string, endingNodeData: INodeData) {
this.activeChatflows[chatflowid] = {
endingNodeData,
inSync: true
}
}
/**
* Update to the pool
* @param {string} chatflowid
* @param {boolean} inSync
*/
updateInSync(chatflowid: string, inSync: boolean) {
if (Object.prototype.hasOwnProperty.call(this.activeChatflows, chatflowid)) {
this.activeChatflows[chatflowid].inSync = inSync
}
}
/**
* Remove from the pool
* @param {string} chatflowid
*/
async remove(chatflowid: string) {
if (Object.prototype.hasOwnProperty.call(this.activeChatflows, chatflowid)) {
delete this.activeChatflows[chatflowid]
}
}
}
+14 -2
View File
@@ -22,7 +22,7 @@ export interface IChatMessage {
createdDate: Date
}
export interface IComponentNodesPool {
export interface IComponentNodes {
[key: string]: INode
}
@@ -95,7 +95,19 @@ export interface INodeQueue {
depth: number
}
export interface IMessage {
message: string
type: MessageType
}
export interface IncomingInput {
question: string
history: string[]
history: IMessage[]
}
export interface IActiveChatflows {
[key: string]: {
endingNodeData: INodeData
inSync: boolean
}
}
+2 -2
View File
@@ -1,4 +1,4 @@
import { IComponentNodesPool } from './Interface'
import { IComponentNodes } from './Interface'
import path from 'path'
import { Dirent } from 'fs'
@@ -6,7 +6,7 @@ import { getNodeModulesPackagePath } from './utils'
import { promises } from 'fs'
export class NodesPool {
componentNodes: IComponentNodesPool = {}
componentNodes: IComponentNodes = {}
/**
* Initialize to get all nodes
+49 -27
View File
@@ -3,17 +3,20 @@ import path from 'path'
import cors from 'cors'
import http from 'http'
import { IChatFlow, IComponentNodesPool, IncomingInput, IReactFlowNode, IReactFlowObject } from './Interface'
import { IChatFlow, IncomingInput, IReactFlowNode, IReactFlowObject } from './Interface'
import { getNodeModulesPackagePath, getStartingNode, buildLangchain, getEndingNode, constructGraphs } from './utils'
import { cloneDeep } from 'lodash'
import { getDataSource } from './DataSource'
import { NodesPool } from './NodesPool'
import { ChatFlow } from './entity/ChatFlow'
import { ChatMessage } from './entity/ChatMessage'
import { ChatflowPool } from './ChatflowPool'
import { INodeData } from 'flowise-components'
export class App {
app: express.Application
componentNodes: IComponentNodesPool = {}
nodesPool: NodesPool
chatflowPool: ChatflowPool
AppDataSource = getDataSource()
constructor() {
@@ -26,10 +29,11 @@ export class App {
.then(async () => {
console.info('📦[server]: Data Source has been initialized!')
// Initialize node instances
const nodesPool = new NodesPool()
await nodesPool.initialize()
this.componentNodes = nodesPool.componentNodes
// Initialize pools
this.nodesPool = new NodesPool()
await this.nodesPool.initialize()
this.chatflowPool = new ChatflowPool()
})
.catch((err) => {
console.error('❌[server]: Error during Data Source initialization:', err)
@@ -53,8 +57,8 @@ export class App {
// Get all component nodes
this.app.get('/api/v1/nodes', (req: Request, res: Response) => {
const returnData = []
for (const nodeName in this.componentNodes) {
const clonedNode = cloneDeep(this.componentNodes[nodeName])
for (const nodeName in this.nodesPool.componentNodes) {
const clonedNode = cloneDeep(this.nodesPool.componentNodes[nodeName])
returnData.push(clonedNode)
}
return res.json(returnData)
@@ -62,8 +66,8 @@ export class App {
// Get specific component node via name
this.app.get('/api/v1/nodes/:name', (req: Request, res: Response) => {
if (Object.prototype.hasOwnProperty.call(this.componentNodes, req.params.name)) {
return res.json(this.componentNodes[req.params.name])
if (Object.prototype.hasOwnProperty.call(this.nodesPool.componentNodes, req.params.name)) {
return res.json(this.nodesPool.componentNodes[req.params.name])
} else {
throw new Error(`Node ${req.params.name} not found`)
}
@@ -71,8 +75,8 @@ export class App {
// Returns specific component node icon via name
this.app.get('/api/v1/node-icon/:name', (req: Request, res: Response) => {
if (Object.prototype.hasOwnProperty.call(this.componentNodes, req.params.name)) {
const nodeInstance = this.componentNodes[req.params.name]
if (Object.prototype.hasOwnProperty.call(this.nodesPool.componentNodes, req.params.name)) {
const nodeInstance = this.nodesPool.componentNodes[req.params.name]
if (nodeInstance.icon === undefined) {
throw new Error(`Node ${req.params.name} icon not found`)
}
@@ -137,6 +141,9 @@ export class App {
this.AppDataSource.getRepository(ChatFlow).merge(chatflow, updateChatFlow)
const result = await this.AppDataSource.getRepository(ChatFlow).save(chatflow)
// Update chatflowpool inSync to false, to build Langchain again because data has been changed
this.chatflowPool.updateInSync(chatflow.id, false)
return res.json(result)
})
@@ -183,30 +190,45 @@ export class App {
// Send input message and get prediction result
this.app.post('/api/v1/prediction/:id', async (req: Request, res: Response) => {
try {
const chatflowid = req.params.id
const incomingInput: IncomingInput = req.body
const chatflow = await this.AppDataSource.getRepository(ChatFlow).findOneBy({
id: req.params.id
})
if (!chatflow) return res.status(404).send(`Chatflow ${req.params.id} not found`)
let nodeToExecuteData: INodeData
const flowData = chatflow.flowData
const parsedFlowData: IReactFlowObject = JSON.parse(flowData)
const { graph, nodeDependencies } = constructGraphs(parsedFlowData.nodes, parsedFlowData.edges)
if (
Object.prototype.hasOwnProperty.call(this.chatflowPool.activeChatflows, chatflowid) &&
this.chatflowPool.activeChatflows[chatflowid].inSync
) {
nodeToExecuteData = this.chatflowPool.activeChatflows[chatflowid].endingNodeData
} else {
const chatflow = await this.AppDataSource.getRepository(ChatFlow).findOneBy({
id: chatflowid
})
if (!chatflow) return res.status(404).send(`Chatflow ${chatflowid} not found`)
const startingNodeIds = getStartingNode(nodeDependencies)
const endingNodeId = getEndingNode(nodeDependencies, graph)
if (!endingNodeId) return res.status(500).send(`Ending node must be either Chain or Agent`)
const flowData = chatflow.flowData
const parsedFlowData: IReactFlowObject = JSON.parse(flowData)
const { graph, nodeDependencies } = constructGraphs(parsedFlowData.nodes, parsedFlowData.edges)
const reactFlowNodes = await buildLangchain(startingNodeIds, parsedFlowData.nodes, graph, this.componentNodes)
const nodeToExecute = reactFlowNodes.find((node: IReactFlowNode) => node.id === endingNodeId)
if (!nodeToExecute) return res.status(404).send(`Node ${endingNodeId} not found`)
const startingNodeIds = getStartingNode(nodeDependencies)
const endingNodeId = getEndingNode(nodeDependencies, graph)
if (!endingNodeId) return res.status(500).send(`Ending node must be either Chain or Agent`)
const nodeInstanceFilePath = this.componentNodes[nodeToExecute.data.name].filePath as string
const reactFlowNodes = await buildLangchain(startingNodeIds, parsedFlowData.nodes, graph, this.nodesPool.componentNodes)
const nodeToExecute = reactFlowNodes.find((node: IReactFlowNode) => node.id === endingNodeId)
if (!nodeToExecute) return res.status(404).send(`Node ${endingNodeId} not found`)
nodeToExecuteData = nodeToExecute.data
this.chatflowPool.add(chatflowid, nodeToExecuteData)
}
const nodeInstanceFilePath = this.nodesPool.componentNodes[nodeToExecuteData.name].filePath as string
const nodeModule = await import(nodeInstanceFilePath)
const nodeInstance = new nodeModule.nodeClass()
const result = await nodeInstance.run(nodeToExecute.data, incomingInput.question)
const result = await nodeInstance.run(nodeToExecuteData, incomingInput.question, { chatHistory: incomingInput.history })
return res.json(result)
} catch (e: any) {
+9 -9
View File
@@ -1,7 +1,7 @@
import path from 'path'
import fs from 'fs'
import {
IComponentNodesPool,
IComponentNodes,
IExploredNode,
INodeDependencies,
INodeDirectedGraph,
@@ -98,8 +98,13 @@ export const getStartingNode = (nodeDependencies: INodeDependencies) => {
return startingNodeIds
}
/**
* Get ending node and check if flow is valid
* @param {INodeDependencies} nodeDependencies
* @param {INodeDirectedGraph} graph
*/
export const getEndingNode = (nodeDependencies: INodeDependencies, graph: INodeDirectedGraph) => {
// Find starting node
// Find ending node
let endingNodeId = ''
Object.keys(graph).forEach((nodeId) => {
if (!graph[nodeId].length && nodeDependencies[nodeId] > 0) {
@@ -113,17 +118,14 @@ export const getEndingNode = (nodeDependencies: INodeDependencies, graph: INodeD
* Build langchain from start to end
* @param {string} startingNodeId
* @param {IReactFlowNode[]} reactFlowNodes
* @param {IReactFlowEdge[]} reactFlowEdges
* @param {INodeDirectedGraph} graph
* @param {IComponentNodesPool} componentNodes
* @param {string} clientId
* @param {any} io
* @param {IComponentNodes} componentNodes
*/
export const buildLangchain = async (
startingNodeIds: string[],
reactFlowNodes: IReactFlowNode[],
graph: INodeDirectedGraph,
componentNodes: IComponentNodesPool
componentNodes: IComponentNodes
) => {
const flowNodes = cloneDeep(reactFlowNodes)
@@ -190,8 +192,6 @@ export const buildLangchain = async (
* Get variable value from outputResponses.output
* @param {string} paramValue
* @param {IReactFlowNode[]} reactFlowNodes
* @param {string} key
* @param {number} loopIndex
* @returns {string}
*/
export const getVariableValue = (paramValue: string, reactFlowNodes: IReactFlowNode[]) => {