add logs to component chains/agents

This commit is contained in:
Henry
2023-07-11 01:53:22 +01:00
parent 2bcc2f90b8
commit eb19c206cf
23 changed files with 414 additions and 199 deletions
+4
View File
@@ -1,5 +1,6 @@
import { ICommonObject } from 'flowise-components'
import { IActiveChatflows, INodeData, IReactFlowNode } from './Interface'
import logger from './utils/logger'
/**
* This pool is to keep track of active chatflow pools
@@ -22,6 +23,7 @@ export class ChatflowPool {
inSync: true
}
if (overrideConfig) this.activeChatflows[chatflowid].overrideConfig = overrideConfig
logger.info(`[server]: Chatflow ${chatflowid} added into ChatflowPool`)
}
/**
@@ -32,6 +34,7 @@ export class ChatflowPool {
updateInSync(chatflowid: string, inSync: boolean) {
if (Object.prototype.hasOwnProperty.call(this.activeChatflows, chatflowid)) {
this.activeChatflows[chatflowid].inSync = inSync
logger.info(`[server]: Chatflow ${chatflowid} updated inSync=${inSync} in ChatflowPool`)
}
}
@@ -42,6 +45,7 @@ export class ChatflowPool {
async remove(chatflowid: string) {
if (Object.prototype.hasOwnProperty.call(this.activeChatflows, chatflowid)) {
delete this.activeChatflows[chatflowid]
logger.info(`[server]: Chatflow ${chatflowid} removed from ChatflowPool`)
}
}
}
+95 -81
View File
@@ -5,6 +5,7 @@ import { DataSource } from 'typeorm'
import { ChatFlow } from './entity/ChatFlow'
import { ChatMessage } from './entity/ChatMessage'
import { Tool } from './entity/Tool'
import logger from './utils/logger'
export class ChildProcess {
/**
@@ -27,99 +28,112 @@ export class ChildProcess {
await sendToParentProcess('start', '_')
const childAppDataSource = await initDB()
try {
const childAppDataSource = await initDB()
// Create a Queue and add our initial node in it
const { endingNodeData, chatflow, chatId, incomingInput, componentNodes } = messageValue
// Create a Queue and add our initial node in it
const { endingNodeData, chatflow, chatId, incomingInput, componentNodes } = messageValue
let nodeToExecuteData: INodeData
let addToChatFlowPool: any = {}
let nodeToExecuteData: INodeData
let addToChatFlowPool: any = {}
/* Don't rebuild the flow (to avoid duplicated upsert, recomputation) when all these conditions met:
* - Node Data already exists in pool
* - Still in sync (i.e the flow has not been modified since)
* - Existing overrideConfig and new overrideConfig are the same
* - Flow doesn't start with nodes that depend on incomingInput.question
***/
if (endingNodeData) {
nodeToExecuteData = endingNodeData
} else {
/*** Get chatflows and prepare data ***/
const flowData = chatflow.flowData
const parsedFlowData: IReactFlowObject = JSON.parse(flowData)
const nodes = parsedFlowData.nodes
const edges = parsedFlowData.edges
/* Don't rebuild the flow (to avoid duplicated upsert, recomputation) when all these conditions met:
* - Node Data already exists in pool
* - Still in sync (i.e the flow has not been modified since)
* - Existing overrideConfig and new overrideConfig are the same
* - Flow doesn't start with nodes that depend on incomingInput.question
***/
if (endingNodeData) {
nodeToExecuteData = endingNodeData
} else {
/*** Get chatflows and prepare data ***/
const flowData = chatflow.flowData
const parsedFlowData: IReactFlowObject = JSON.parse(flowData)
const nodes = parsedFlowData.nodes
const edges = parsedFlowData.edges
/*** Get Ending Node with Directed Graph ***/
const { graph, nodeDependencies } = constructGraphs(nodes, edges)
const directedGraph = graph
const endingNodeId = getEndingNode(nodeDependencies, directedGraph)
if (!endingNodeId) {
await sendToParentProcess('error', `Ending node must be either a Chain or Agent`)
return
}
/*** Get Ending Node with Directed Graph ***/
const { graph, nodeDependencies } = constructGraphs(nodes, edges)
const directedGraph = graph
const endingNodeId = getEndingNode(nodeDependencies, directedGraph)
if (!endingNodeId) {
await sendToParentProcess('error', `Ending node ${endingNodeId} not found`)
return
}
const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data
if (!endingNodeData) {
await sendToParentProcess('error', `Ending node must be either a Chain or Agent`)
return
}
const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data
if (!endingNodeData) {
await sendToParentProcess('error', `Ending node ${endingNodeId} data not found`)
return
}
if (
endingNodeData.outputs &&
Object.keys(endingNodeData.outputs).length &&
!Object.values(endingNodeData.outputs).includes(endingNodeData.name)
) {
await sendToParentProcess(
'error',
`Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction`
if (endingNodeData && endingNodeData.category !== 'Chains' && endingNodeData.category !== 'Agents') {
await sendToParentProcess('error', `Ending node must be either a Chain or Agent`)
return
}
if (
endingNodeData.outputs &&
Object.keys(endingNodeData.outputs).length &&
!Object.values(endingNodeData.outputs).includes(endingNodeData.name)
) {
await sendToParentProcess(
'error',
`Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction`
)
return
}
/*** Get Starting Nodes with Non-Directed Graph ***/
const constructedObj = constructGraphs(nodes, edges, true)
const nonDirectedGraph = constructedObj.graph
const { startingNodeIds, depthQueue } = getStartingNodes(nonDirectedGraph, endingNodeId)
logger.debug(`[server] [mode:child]: Start building chatflow ${chatflow.id}`)
/*** BFS to traverse from Starting Nodes to Ending Node ***/
const reactFlowNodes = await buildLangchain(
startingNodeIds,
nodes,
graph,
depthQueue,
componentNodes,
incomingInput.question,
chatId,
childAppDataSource,
incomingInput?.overrideConfig
)
return
const nodeToExecute = reactFlowNodes.find((node: IReactFlowNode) => node.id === endingNodeId)
if (!nodeToExecute) {
await sendToParentProcess('error', `Node ${endingNodeId} not found`)
return
}
const reactFlowNodeData: INodeData = resolveVariables(nodeToExecute.data, reactFlowNodes, incomingInput.question)
nodeToExecuteData = reactFlowNodeData
const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.id))
addToChatFlowPool = {
chatflowid: chatflow.id,
nodeToExecuteData,
startingNodes,
overrideConfig: incomingInput?.overrideConfig
}
}
/*** Get Starting Nodes with Non-Directed Graph ***/
const constructedObj = constructGraphs(nodes, edges, true)
const nonDirectedGraph = constructedObj.graph
const { startingNodeIds, depthQueue } = getStartingNodes(nonDirectedGraph, endingNodeId)
const nodeInstanceFilePath = componentNodes[nodeToExecuteData.name].filePath as string
const nodeModule = await import(nodeInstanceFilePath)
const nodeInstance = new nodeModule.nodeClass()
/*** BFS to traverse from Starting Nodes to Ending Node ***/
const reactFlowNodes = await buildLangchain(
startingNodeIds,
nodes,
graph,
depthQueue,
componentNodes,
incomingInput.question,
chatId,
childAppDataSource,
incomingInput?.overrideConfig
)
logger.debug(`[server] [mode:child]: Running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`)
const result = await nodeInstance.run(nodeToExecuteData, incomingInput.question, { chatHistory: incomingInput.history })
logger.debug(`[server] [mode:child]: Finished running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`)
const nodeToExecute = reactFlowNodes.find((node: IReactFlowNode) => node.id === endingNodeId)
if (!nodeToExecute) {
await sendToParentProcess('error', `Node ${endingNodeId} not found`)
return
}
const reactFlowNodeData: INodeData = resolveVariables(nodeToExecute.data, reactFlowNodes, incomingInput.question)
nodeToExecuteData = reactFlowNodeData
const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.id))
addToChatFlowPool = {
chatflowid: chatflow.id,
nodeToExecuteData,
startingNodes,
overrideConfig: incomingInput?.overrideConfig
}
await sendToParentProcess('finish', { result, addToChatFlowPool })
} catch (e: any) {
await sendToParentProcess('error', e.message)
logger.error('[server] [mode:child]: Error:', e)
}
const nodeInstanceFilePath = componentNodes[nodeToExecuteData.name].filePath as string
const nodeModule = await import(nodeInstanceFilePath)
const nodeInstance = new nodeModule.nodeClass()
const result = await nodeInstance.run(nodeToExecuteData, incomingInput.question, { chatHistory: incomingInput.history })
await sendToParentProcess('finish', { result, addToChatFlowPool })
}
}
+2
View File
@@ -23,6 +23,7 @@ export default class Start extends Command {
DATABASE_PATH: Flags.string(),
APIKEY_PATH: Flags.string(),
LOG_PATH: Flags.string(),
LOG_LEVEL: Flags.string(),
EXECUTION_MODE: Flags.string()
}
@@ -61,6 +62,7 @@ export default class Start extends Command {
if (flags.DATABASE_PATH) process.env.DATABASE_PATH = flags.DATABASE_PATH
if (flags.APIKEY_PATH) process.env.APIKEY_PATH = flags.APIKEY_PATH
if (flags.LOG_PATH) process.env.LOG_PATH = flags.LOG_PATH
if (flags.LOG_LEVEL) process.env.LOG_LEVEL = flags.LOG_LEVEL
if (flags.EXECUTION_MODE) process.env.EXECUTION_MODE = flags.EXECUTION_MODE
if (flags.DEBUG) process.env.DEBUG = flags.DEBUG
+28 -8
View File
@@ -283,10 +283,16 @@ export class App {
const nodes = parsedFlowData.nodes
const edges = parsedFlowData.edges
const { graph, nodeDependencies } = constructGraphs(nodes, edges)
const endingNodeId = getEndingNode(nodeDependencies, graph)
if (!endingNodeId) return res.status(500).send(`Ending node must be either a Chain or Agent`)
if (!endingNodeId) return res.status(500).send(`Ending node ${endingNodeId} not found`)
const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data
if (!endingNodeData) return res.status(500).send(`Ending node must be either a Chain or Agent`)
if (!endingNodeData) return res.status(500).send(`Ending node ${endingNodeId} data not found`)
if (endingNodeData && endingNodeData.category !== 'Chains' && endingNodeData.category !== 'Agents') {
return res.status(500).send(`Ending node must be either a Chain or Agent`)
}
const obj = {
isStreaming: isFlowValidForStream(nodes, endingNodeData)
@@ -638,7 +644,7 @@ export class App {
})
})
} catch (err) {
logger.error(err)
logger.error('[server] [mode:child]: Error:', err)
}
}
@@ -714,9 +720,11 @@ export class App {
if (process.env.EXECUTION_MODE === 'child') {
if (isFlowReusable()) {
nodeToExecuteData = this.chatflowPool.activeChatflows[chatflowid].endingNodeData
logger.debug(
`[server] [mode:child]: Reuse existing chatflow ${chatflowid} with ending node ${nodeToExecuteData.label} (${nodeToExecuteData.id})`
)
try {
const result = await this.startChildProcess(chatflow, chatId, incomingInput, nodeToExecuteData)
return res.json(result)
} catch (error) {
return res.status(500).send(error)
@@ -739,15 +747,22 @@ export class App {
if (isFlowReusable()) {
nodeToExecuteData = this.chatflowPool.activeChatflows[chatflowid].endingNodeData
isStreamValid = isFlowValidForStream(nodes, nodeToExecuteData)
logger.debug(
`[server]: Reuse existing chatflow ${chatflowid} with ending node ${nodeToExecuteData.label} (${nodeToExecuteData.id})`
)
} else {
/*** Get Ending Node with Directed Graph ***/
const { graph, nodeDependencies } = constructGraphs(nodes, edges)
const directedGraph = graph
const endingNodeId = getEndingNode(nodeDependencies, directedGraph)
if (!endingNodeId) return res.status(500).send(`Ending node must be either a Chain or Agent`)
if (!endingNodeId) return res.status(500).send(`Ending node ${endingNodeId} not found`)
const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data
if (!endingNodeData) return res.status(500).send(`Ending node must be either a Chain or Agent`)
if (!endingNodeData) return res.status(500).send(`Ending node ${endingNodeId} data not found`)
if (endingNodeData && endingNodeData.category !== 'Chains' && endingNodeData.category !== 'Agents') {
return res.status(500).send(`Ending node must be either a Chain or Agent`)
}
if (
endingNodeData.outputs &&
@@ -768,6 +783,7 @@ export class App {
const nonDirectedGraph = constructedObj.graph
const { startingNodeIds, depthQueue } = getStartingNodes(nonDirectedGraph, endingNodeId)
logger.debug(`[server]: Start building chatflow ${chatflowid}`)
/*** BFS to traverse from Starting Nodes to Ending Node ***/
const reactFlowNodes = await buildLangchain(
startingNodeIds,
@@ -796,17 +812,21 @@ export class App {
const nodeInstance = new nodeModule.nodeClass()
isStreamValid = isStreamValid && !isVectorStoreFaiss(nodeToExecuteData)
logger.debug(`[server]: Running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`)
const result = isStreamValid
? await nodeInstance.run(nodeToExecuteData, incomingInput.question, {
chatHistory: incomingInput.history,
socketIO,
socketIOClientId: incomingInput.socketIOClientId
socketIOClientId: incomingInput.socketIOClientId,
logger
})
: await nodeInstance.run(nodeToExecuteData, incomingInput.question, { chatHistory: incomingInput.history })
: await nodeInstance.run(nodeToExecuteData, incomingInput.question, { chatHistory: incomingInput.history, logger })
logger.debug(`[server]: Finished running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`)
return res.json(result)
}
} catch (e: any) {
logger.error('[server]: Error:', e)
return res.status(500).send(e.message)
}
}
+2 -2
View File
@@ -9,12 +9,12 @@ dotenv.config({ path: path.join(__dirname, '..', '..', '.env'), override: true }
const loggingConfig = {
dir: process.env.LOG_PATH ?? path.join(__dirname, '..', '..', '..', '..', 'logs'),
server: {
level: 'info',
level: process.env.LOG_LEVEL ?? 'info',
filename: 'server.log',
errorFilename: 'server-error.log'
},
express: {
level: 'info',
level: process.env.LOG_LEVEL ?? 'info',
format: 'jsonl', // can't be changed currently
filename: 'server-requests.log.jsonl' // should end with .jsonl
}
+7 -1
View File
@@ -180,6 +180,9 @@ export const getEndingNode = (nodeDependencies: INodeDependencies, graph: INodeD
* @param {IDepthQueue} depthQueue
* @param {IComponentNodes} componentNodes
* @param {string} question
* @param {string} chatId
* @param {DataSource} appDataSource
* @param {ICommonObject} overrideConfig
*/
export const buildLangchain = async (
startingNodeIds: string[],
@@ -222,11 +225,14 @@ export const buildLangchain = async (
if (overrideConfig) flowNodeData = replaceInputsWithConfig(flowNodeData, overrideConfig)
const reactFlowNodeData: INodeData = resolveVariables(flowNodeData, flowNodes, question)
logger.debug(`[server]: Initializing ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
flowNodes[nodeIndex].data.instance = await newNodeInstance.init(reactFlowNodeData, question, {
chatId,
appDataSource,
databaseEntities
databaseEntities,
logger
})
logger.debug(`[server]: Finished initializing ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
} catch (e: any) {
logger.error(e)
throw new Error(e)
+8 -6
View File
@@ -4,7 +4,7 @@ import config from './config' // should be replaced by node-config or similar
import { createLogger, transports, format } from 'winston'
import { NextFunction, Request, Response } from 'express'
const { combine, timestamp, printf } = format
const { combine, timestamp, printf, errors } = format
// expect the log dir be relative to the projects root
const logDir = config.logging.dir
@@ -18,9 +18,11 @@ const logger = createLogger({
format: combine(
timestamp({ format: 'YYYY-MM-DD HH:mm:ss' }),
format.json(),
printf(({ level, message, timestamp }) => {
return `${timestamp} [${level.toUpperCase()}]: ${message}`
})
printf(({ level, message, timestamp, stack }) => {
const text = `${timestamp} [${level.toUpperCase()}]: ${message}`
return stack ? text + '\n' + stack : text
}),
errors({ stack: true })
),
defaultMeta: {
package: 'server'
@@ -56,7 +58,7 @@ const logger = createLogger({
*/
export function expressRequestLogger(req: Request, res: Response, next: NextFunction): void {
const fileLogger = createLogger({
format: combine(timestamp({ format: 'YYYY-MM-DD HH:mm:ss' }), format.json()),
format: combine(timestamp({ format: 'YYYY-MM-DD HH:mm:ss' }), format.json(), errors({ stack: true })),
defaultMeta: {
package: 'server',
request: {
@@ -71,7 +73,7 @@ export function expressRequestLogger(req: Request, res: Response, next: NextFunc
transports: [
new transports.File({
filename: path.join(logDir, config.logging.express.filename ?? 'server-requests.log.jsonl'),
level: 'debug'
level: config.logging.express.level ?? 'debug'
})
]
})