Merge pull request #149 from FlowiseAI/feature/Streaming

Feature/Streaming
This commit is contained in:
Ong Chung Yau
2023-05-26 09:01:40 +07:00
committed by GitHub
29 changed files with 902 additions and 301 deletions
+1
View File
@@ -57,6 +57,7 @@
"moment-timezone": "^0.5.34",
"multer": "^1.4.5-lts.1",
"reflect-metadata": "^0.1.13",
"socket.io": "^4.6.1",
"sqlite3": "^5.1.6",
"typeorm": "^0.3.6"
},
+1
View File
@@ -115,6 +115,7 @@ export interface IncomingInput {
question: string
history: IMessage[]
overrideConfig?: ICommonObject
socketIOClientId?: string
}
export interface IActiveChatflows {
+57 -13
View File
@@ -5,6 +5,7 @@ import cors from 'cors'
import http from 'http'
import * as fs from 'fs'
import basicAuth from 'express-basic-auth'
import { Server } from 'socket.io'
import {
IChatFlow,
@@ -32,7 +33,8 @@ import {
mapMimeTypeToInputField,
findAvailableConfigs,
isSameOverrideConfig,
replaceAllAPIKeys
replaceAllAPIKeys,
isFlowValidForStream
} from './utils'
import { cloneDeep } from 'lodash'
import { getDataSource } from './DataSource'
@@ -73,7 +75,7 @@ export class App {
})
}
async config() {
async config(socketIO?: Server) {
// Limit is needed to allow sending/receiving base64 encoded string
this.app.use(express.json({ limit: '50mb' }))
this.app.use(express.urlencoded({ limit: '50mb', extended: true }))
@@ -200,6 +202,30 @@ export class App {
return res.json(results)
})
// Check if chatflow valid for streaming
this.app.get('/api/v1/chatflows-streaming/:id', async (req: Request, res: Response) => {
const chatflow = await this.AppDataSource.getRepository(ChatFlow).findOneBy({
id: req.params.id
})
if (!chatflow) return res.status(404).send(`Chatflow ${req.params.id} not found`)
/*** Get Ending Node with Directed Graph ***/
const flowData = chatflow.flowData
const parsedFlowData: IReactFlowObject = JSON.parse(flowData)
const nodes = parsedFlowData.nodes
const edges = parsedFlowData.edges
const { graph, nodeDependencies } = constructGraphs(nodes, edges)
const endingNodeId = getEndingNode(nodeDependencies, graph)
if (!endingNodeId) return res.status(500).send(`Ending node must be either a Chain or Agent`)
const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data
if (!endingNodeData) return res.status(500).send(`Ending node must be either a Chain or Agent`)
const obj = {
isStreaming: isFlowValidForStream(nodes, endingNodeData)
}
return res.json(obj)
})
// ----------------------------------------
// ChatMessage
// ----------------------------------------
@@ -303,12 +329,12 @@ export class App {
// Send input message and get prediction result (External)
this.app.post('/api/v1/prediction/:id', upload.array('files'), async (req: Request, res: Response) => {
await this.processPrediction(req, res)
await this.processPrediction(req, res, socketIO)
})
// Send input message and get prediction result (Internal)
this.app.post('/api/v1/internal-prediction/:id', async (req: Request, res: Response) => {
await this.processPrediction(req, res, true)
await this.processPrediction(req, res, socketIO, true)
})
// ----------------------------------------
@@ -464,9 +490,10 @@ export class App {
* Process Prediction
* @param {Request} req
* @param {Response} res
* @param {Server} socketIO
* @param {boolean} isInternal
*/
async processPrediction(req: Request, res: Response, isInternal = false) {
async processPrediction(req: Request, res: Response, socketIO?: Server, isInternal = false) {
try {
const chatflowid = req.params.id
let incomingInput: IncomingInput = req.body
@@ -482,6 +509,8 @@ export class App {
await this.validateKey(req, res, chatflow)
}
let isStreamValid = false
const files = (req.files as any[]) || []
if (files.length) {
@@ -542,15 +571,16 @@ export class App {
}
}
} else {
/*** Get chatflows and prepare data ***/
const flowData = chatflow.flowData
const parsedFlowData: IReactFlowObject = JSON.parse(flowData)
const nodes = parsedFlowData.nodes
const edges = parsedFlowData.edges
if (isRebuildNeeded()) {
nodeToExecuteData = this.chatflowPool.activeChatflows[chatflowid].endingNodeData
isStreamValid = isFlowValidForStream(nodes, nodeToExecuteData)
} else {
/*** Get chatflows and prepare data ***/
const flowData = chatflow.flowData
const parsedFlowData: IReactFlowObject = JSON.parse(flowData)
const nodes = parsedFlowData.nodes
const edges = parsedFlowData.edges
/*** Get Ending Node with Directed Graph ***/
const { graph, nodeDependencies } = constructGraphs(nodes, edges)
const directedGraph = graph
@@ -572,6 +602,8 @@ export class App {
)
}
isStreamValid = isFlowValidForStream(nodes, endingNodeData)
/*** Get Starting Nodes with Non-Directed Graph ***/
const constructedObj = constructGraphs(nodes, edges, true)
const nonDirectedGraph = constructedObj.graph
@@ -602,7 +634,13 @@ export class App {
const nodeModule = await import(nodeInstanceFilePath)
const nodeInstance = new nodeModule.nodeClass()
const result = await nodeInstance.run(nodeToExecuteData, incomingInput.question, { chatHistory: incomingInput.history })
const result = isStreamValid
? await nodeInstance.run(nodeToExecuteData, incomingInput.question, {
chatHistory: incomingInput.history,
socketIO,
socketIOClientId: incomingInput.socketIOClientId
})
: await nodeInstance.run(nodeToExecuteData, incomingInput.question, { chatHistory: incomingInput.history })
return res.json(result)
}
@@ -629,8 +667,14 @@ export async function start(): Promise<void> {
const port = parseInt(process.env.PORT || '', 10) || 3000
const server = http.createServer(serverApp.app)
const io = new Server(server, {
cors: {
origin: '*'
}
})
await serverApp.initDatabase()
await serverApp.config()
await serverApp.config(io)
server.listen(port, () => {
console.info(`⚡️[server]: Flowise Server is listening at ${port}`)
+25
View File
@@ -610,3 +610,28 @@ export const findAvailableConfigs = (reactFlowNodes: IReactFlowNode[]) => {
return configs
}
/**
* Check to see if flow valid for stream
* @param {IReactFlowNode[]} reactFlowNodes
* @param {INodeData} endingNodeData
* @returns {boolean}
*/
export const isFlowValidForStream = (reactFlowNodes: IReactFlowNode[], endingNodeData: INodeData) => {
const streamAvailableLLMs = {
'Chat Models': ['azureChatOpenAI', 'chatOpenAI', 'chatAnthropic'],
LLMs: ['azureOpenAI', 'openAI']
}
let isChatOrLLMsExist = false
for (const flowNode of reactFlowNodes) {
const data = flowNode.data
if (data.category === 'Chat Models' || data.category === 'LLMs') {
isChatOrLLMsExist = true
const validLLMs = streamAvailableLLMs[data.category]
if (!validLLMs.includes(data.name)) return false
}
}
return isChatOrLLMsExist && endingNodeData.category === 'Chains'
}