mirror of
https://github.com/farcasclaudiu/Flowise.git
synced 2026-06-28 15:00:57 +03:00
enable streaming
This commit is contained in:
@@ -115,6 +115,7 @@ export interface IncomingInput {
|
||||
question: string
|
||||
history: IMessage[]
|
||||
overrideConfig?: ICommonObject
|
||||
socketIOClientId?: string
|
||||
}
|
||||
|
||||
export interface IActiveChatflows {
|
||||
|
||||
@@ -5,6 +5,7 @@ import cors from 'cors'
|
||||
import http from 'http'
|
||||
import * as fs from 'fs'
|
||||
import basicAuth from 'express-basic-auth'
|
||||
import { Server } from 'socket.io'
|
||||
|
||||
import {
|
||||
IChatFlow,
|
||||
@@ -32,7 +33,8 @@ import {
|
||||
mapMimeTypeToInputField,
|
||||
findAvailableConfigs,
|
||||
isSameOverrideConfig,
|
||||
replaceAllAPIKeys
|
||||
replaceAllAPIKeys,
|
||||
isFlowValidForStream
|
||||
} from './utils'
|
||||
import { cloneDeep } from 'lodash'
|
||||
import { getDataSource } from './DataSource'
|
||||
@@ -73,7 +75,7 @@ export class App {
|
||||
})
|
||||
}
|
||||
|
||||
async config() {
|
||||
async config(socketIO?: Server) {
|
||||
// Limit is needed to allow sending/receiving base64 encoded string
|
||||
this.app.use(express.json({ limit: '50mb' }))
|
||||
this.app.use(express.urlencoded({ limit: '50mb', extended: true }))
|
||||
@@ -200,6 +202,30 @@ export class App {
|
||||
return res.json(results)
|
||||
})
|
||||
|
||||
// Check if chatflow valid for streaming
|
||||
this.app.get('/api/v1/chatflows-streaming/:id', async (req: Request, res: Response) => {
|
||||
const chatflow = await this.AppDataSource.getRepository(ChatFlow).findOneBy({
|
||||
id: req.params.id
|
||||
})
|
||||
if (!chatflow) return res.status(404).send(`Chatflow ${req.params.id} not found`)
|
||||
|
||||
/*** Get Ending Node with Directed Graph ***/
|
||||
const flowData = chatflow.flowData
|
||||
const parsedFlowData: IReactFlowObject = JSON.parse(flowData)
|
||||
const nodes = parsedFlowData.nodes
|
||||
const edges = parsedFlowData.edges
|
||||
const { graph, nodeDependencies } = constructGraphs(nodes, edges)
|
||||
const endingNodeId = getEndingNode(nodeDependencies, graph)
|
||||
if (!endingNodeId) return res.status(500).send(`Ending node must be either a Chain or Agent`)
|
||||
const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data
|
||||
if (!endingNodeData) return res.status(500).send(`Ending node must be either a Chain or Agent`)
|
||||
|
||||
const obj = {
|
||||
isStreaming: isFlowValidForStream(nodes, endingNodeData)
|
||||
}
|
||||
return res.json(obj)
|
||||
})
|
||||
|
||||
// ----------------------------------------
|
||||
// ChatMessage
|
||||
// ----------------------------------------
|
||||
@@ -303,12 +329,12 @@ export class App {
|
||||
|
||||
// Send input message and get prediction result (External)
|
||||
this.app.post('/api/v1/prediction/:id', upload.array('files'), async (req: Request, res: Response) => {
|
||||
await this.processPrediction(req, res)
|
||||
await this.processPrediction(req, res, socketIO)
|
||||
})
|
||||
|
||||
// Send input message and get prediction result (Internal)
|
||||
this.app.post('/api/v1/internal-prediction/:id', async (req: Request, res: Response) => {
|
||||
await this.processPrediction(req, res, true)
|
||||
await this.processPrediction(req, res, socketIO, true)
|
||||
})
|
||||
|
||||
// ----------------------------------------
|
||||
@@ -464,9 +490,10 @@ export class App {
|
||||
* Process Prediction
|
||||
* @param {Request} req
|
||||
* @param {Response} res
|
||||
* @param {Server} socketIO
|
||||
* @param {boolean} isInternal
|
||||
*/
|
||||
async processPrediction(req: Request, res: Response, isInternal = false) {
|
||||
async processPrediction(req: Request, res: Response, socketIO?: Server, isInternal = false) {
|
||||
try {
|
||||
const chatflowid = req.params.id
|
||||
let incomingInput: IncomingInput = req.body
|
||||
@@ -482,6 +509,8 @@ export class App {
|
||||
await this.validateKey(req, res, chatflow)
|
||||
}
|
||||
|
||||
let isStreamValid = false
|
||||
|
||||
const files = (req.files as any[]) || []
|
||||
|
||||
if (files.length) {
|
||||
@@ -542,15 +571,16 @@ export class App {
|
||||
}
|
||||
}
|
||||
} else {
|
||||
/*** Get chatflows and prepare data ***/
|
||||
const flowData = chatflow.flowData
|
||||
const parsedFlowData: IReactFlowObject = JSON.parse(flowData)
|
||||
const nodes = parsedFlowData.nodes
|
||||
const edges = parsedFlowData.edges
|
||||
|
||||
if (isRebuildNeeded()) {
|
||||
nodeToExecuteData = this.chatflowPool.activeChatflows[chatflowid].endingNodeData
|
||||
isStreamValid = isFlowValidForStream(nodes, nodeToExecuteData)
|
||||
} else {
|
||||
/*** Get chatflows and prepare data ***/
|
||||
const flowData = chatflow.flowData
|
||||
const parsedFlowData: IReactFlowObject = JSON.parse(flowData)
|
||||
const nodes = parsedFlowData.nodes
|
||||
const edges = parsedFlowData.edges
|
||||
|
||||
/*** Get Ending Node with Directed Graph ***/
|
||||
const { graph, nodeDependencies } = constructGraphs(nodes, edges)
|
||||
const directedGraph = graph
|
||||
@@ -572,6 +602,8 @@ export class App {
|
||||
)
|
||||
}
|
||||
|
||||
isStreamValid = isFlowValidForStream(nodes, endingNodeData)
|
||||
|
||||
/*** Get Starting Nodes with Non-Directed Graph ***/
|
||||
const constructedObj = constructGraphs(nodes, edges, true)
|
||||
const nonDirectedGraph = constructedObj.graph
|
||||
@@ -602,7 +634,13 @@ export class App {
|
||||
const nodeModule = await import(nodeInstanceFilePath)
|
||||
const nodeInstance = new nodeModule.nodeClass()
|
||||
|
||||
const result = await nodeInstance.run(nodeToExecuteData, incomingInput.question, { chatHistory: incomingInput.history })
|
||||
const result = isStreamValid
|
||||
? await nodeInstance.run(nodeToExecuteData, incomingInput.question, {
|
||||
chatHistory: incomingInput.history,
|
||||
socketIO,
|
||||
socketIOClientId: incomingInput.socketIOClientId
|
||||
})
|
||||
: await nodeInstance.run(nodeToExecuteData, incomingInput.question, { chatHistory: incomingInput.history })
|
||||
|
||||
return res.json(result)
|
||||
}
|
||||
@@ -629,8 +667,14 @@ export async function start(): Promise<void> {
|
||||
const port = parseInt(process.env.PORT || '', 10) || 3000
|
||||
const server = http.createServer(serverApp.app)
|
||||
|
||||
const io = new Server(server, {
|
||||
cors: {
|
||||
origin: '*'
|
||||
}
|
||||
})
|
||||
|
||||
await serverApp.initDatabase()
|
||||
await serverApp.config()
|
||||
await serverApp.config(io)
|
||||
|
||||
server.listen(port, () => {
|
||||
console.info(`⚡️[server]: Flowise Server is listening at ${port}`)
|
||||
|
||||
@@ -610,3 +610,28 @@ export const findAvailableConfigs = (reactFlowNodes: IReactFlowNode[]) => {
|
||||
|
||||
return configs
|
||||
}
|
||||
|
||||
/**
|
||||
* Check to see if flow valid for stream
|
||||
* @param {IReactFlowNode[]} reactFlowNodes
|
||||
* @param {INodeData} endingNodeData
|
||||
* @returns {boolean}
|
||||
*/
|
||||
export const isFlowValidForStream = (reactFlowNodes: IReactFlowNode[], endingNodeData: INodeData) => {
|
||||
const streamAvailableLLMs = {
|
||||
'Chat Models': ['azureChatOpenAI', 'chatOpenAI', 'chatAnthropic'],
|
||||
LLMs: ['azureOpenAI', 'openAI']
|
||||
}
|
||||
|
||||
let isChatOrLLMsExist = false
|
||||
for (const flowNode of reactFlowNodes) {
|
||||
const data = flowNode.data
|
||||
if (data.category === 'Chat Models' || data.category === 'LLMs') {
|
||||
isChatOrLLMsExist = true
|
||||
const validLLMs = streamAvailableLLMs[data.category]
|
||||
if (!validLLMs.includes(data.name)) return false
|
||||
}
|
||||
}
|
||||
|
||||
return isChatOrLLMsExist && endingNodeData.category === 'Chains'
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user