Feature/sse (#3125)

* Base changes for ServerSide Events (instead of socket.io)

* lint fixes

* adding of interface and separate methods for streaming events

* lint

* first draft, handles both internal and external prediction end points.

* lint fixes

* additional internal end point for streaming and associated changes

* return streamresponse as true to build agent flow

* 1) JSON formatting for internal events
2) other fixes

* 1) convert internal event to metadata to maintain consistency with external response

* fix action and metadata streaming

* fix for error when agent flow is aborted

* prevent subflows from streaming and other code cleanup

* prevent streaming from enclosed tools

* add fix for preventing chaintool streaming

* update lock file

* add open when hidden to sse

* Streaming errors

* Streaming errors

* add fix for showing error message

---------

Co-authored-by: Henry <hzj94@hotmail.com>
This commit is contained in:
Vinod Kiran
2024-09-17 12:31:25 +05:30
committed by GitHub
parent 7a4c7efcab
commit 26444ac3ae
47 changed files with 1021 additions and 327 deletions
+208
View File
@@ -0,0 +1,208 @@
import express from 'express'
import { Response } from 'express'
import { IServerSideEventStreamer } from 'flowise-components'
// define a new type that has a client type (INTERNAL or EXTERNAL) and Response type
type Client = {
// future use
clientType: 'INTERNAL' | 'EXTERNAL'
response: Response
// optional property with default value
started?: boolean
}
export class SSEStreamer implements IServerSideEventStreamer {
clients: { [id: string]: Client } = {}
app: express.Application
constructor(app: express.Application) {
this.app = app
}
addExternalClient(chatId: string, res: Response) {
this.clients[chatId] = { clientType: 'EXTERNAL', response: res, started: false }
}
addClient(chatId: string, res: Response) {
this.clients[chatId] = { clientType: 'INTERNAL', response: res, started: false }
}
removeClient(chatId: string) {
const client = this.clients[chatId]
if (client) {
const clientResponse = {
event: 'end',
data: '[DONE]'
}
client.response.write('message\ndata:' + JSON.stringify(clientResponse) + '\n\n')
client.response.end()
delete this.clients[chatId]
}
}
// Send SSE message to a specific client
streamEvent(chatId: string, data: string) {
const client = this.clients[chatId]
if (client) {
const clientResponse = {
event: 'start',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
}
}
streamCustomEvent(chatId: string, eventType: string, data: any) {
const client = this.clients[chatId]
if (client) {
const clientResponse = {
event: eventType,
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
}
}
streamStartEvent(chatId: string, data: string) {
const client = this.clients[chatId]
// prevent multiple start events being streamed to the client
if (client && !client.started) {
const clientResponse = {
event: 'start',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
client.started = true
}
}
streamTokenEvent(chatId: string, data: string) {
const client = this.clients[chatId]
if (client) {
const clientResponse = {
event: 'token',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
}
}
streamSourceDocumentsEvent(chatId: string, data: any) {
const client = this.clients[chatId]
if (client) {
const clientResponse = {
event: 'sourceDocuments',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
}
}
streamUsedToolsEvent(chatId: string, data: any): void {
const client = this.clients[chatId]
if (client) {
const clientResponse = {
event: 'usedTools',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
}
}
streamFileAnnotationsEvent(chatId: string, data: any): void {
const client = this.clients[chatId]
if (client) {
const clientResponse = {
event: 'fileAnnotations',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
}
}
streamToolEvent(chatId: string, data: any): void {
const client = this.clients[chatId]
if (client) {
const clientResponse = {
event: 'tool',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
}
}
streamAgentReasoningEvent(chatId: string, data: any): void {
const client = this.clients[chatId]
if (client) {
const clientResponse = {
event: 'agentReasoning',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
}
}
streamNextAgentEvent(chatId: string, data: any): void {
const client = this.clients[chatId]
if (client) {
const clientResponse = {
event: 'nextAgent',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
}
}
streamActionEvent(chatId: string, data: any): void {
const client = this.clients[chatId]
if (client) {
const clientResponse = {
event: 'action',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
}
}
streamAbortEvent(chatId: string): void {
const client = this.clients[chatId]
if (client) {
const clientResponse = {
event: 'abort',
data: '[DONE]'
}
client.response.write('message\ndata:' + JSON.stringify(clientResponse) + '\n\n')
}
}
streamEndEvent(_: string) {
// placeholder for future use
}
streamErrorEvent(chatId: string, msg: string) {
const client = this.clients[chatId]
if (client) {
const clientResponse = {
event: 'error',
data: msg
}
client.response.write('message\ndata:' + JSON.stringify(clientResponse) + '\n\n')
}
}
streamMetadataEvent(chatId: string, apiResponse: any) {
const metadataJson: any = {}
if (apiResponse.chatId) {
metadataJson['chatId'] = apiResponse.chatId
}
if (apiResponse.chatMessageId) {
metadataJson['chatMessageId'] = apiResponse.chatMessageId
}
if (apiResponse.question) {
metadataJson['question'] = apiResponse.question
}
if (apiResponse.sessionId) {
metadataJson['sessionId'] = apiResponse.sessionId
}
if (apiResponse.memoryType) {
metadataJson['memoryType'] = apiResponse.memoryType
}
if (Object.keys(metadataJson).length > 0) {
this.streamCustomEvent(chatId, 'metadata', metadataJson)
}
}
}
+29 -27
View File
@@ -9,9 +9,9 @@ import {
ISeqAgentsState,
ISeqAgentNode,
IUsedTool,
IDocument
IDocument,
IServerSideEventStreamer
} from 'flowise-components'
import { Server } from 'socket.io'
import { omit, cloneDeep, flatten, uniq } from 'lodash'
import { StateGraph, END, START } from '@langchain/langgraph'
import { Document } from '@langchain/core/documents'
@@ -53,7 +53,6 @@ import logger from './logger'
* @param {ICommonObject} incomingInput
* @param {boolean} isInternal
* @param {string} baseURL
* @param {Server} socketIO
*/
export const buildAgentGraph = async (
chatflow: IChatFlow,
@@ -62,7 +61,8 @@ export const buildAgentGraph = async (
incomingInput: IncomingInput,
isInternal: boolean,
baseURL?: string,
socketIO?: Server
sseStreamer?: IServerSideEventStreamer,
shouldStreamResponse?: boolean
): Promise<any> => {
try {
const appServer = getRunningExpressApp()
@@ -287,28 +287,31 @@ export const buildAgentGraph = async (
? output[agentName].messages[output[agentName].messages.length - 1].content
: lastWorkerResult
if (socketIO && incomingInput.socketIOClientId) {
if (shouldStreamResponse) {
if (!isStreamingStarted) {
isStreamingStarted = true
socketIO.to(incomingInput.socketIOClientId).emit('start', agentReasoning)
if (sseStreamer) {
sseStreamer.streamStartEvent(chatId, agentReasoning)
}
}
socketIO.to(incomingInput.socketIOClientId).emit('agentReasoning', agentReasoning)
if (sseStreamer) {
sseStreamer.streamAgentReasoningEvent(chatId, agentReasoning)
}
// Send loading next agent indicator
if (reasoning.next && reasoning.next !== 'FINISH' && reasoning.next !== 'END') {
socketIO
.to(incomingInput.socketIOClientId)
.emit('nextAgent', mapNameToLabel[reasoning.next].label || reasoning.next)
if (sseStreamer) {
sseStreamer.streamNextAgentEvent(chatId, mapNameToLabel[reasoning.next].label || reasoning.next)
}
}
}
}
} else {
finalResult = output.__end__.messages.length ? output.__end__.messages.pop()?.content : ''
if (Array.isArray(finalResult)) finalResult = output.__end__.instructions
if (socketIO && incomingInput.socketIOClientId) {
socketIO.to(incomingInput.socketIOClientId).emit('token', finalResult)
if (shouldStreamResponse && sseStreamer) {
sseStreamer.streamTokenEvent(chatId, finalResult)
}
}
}
@@ -321,9 +324,8 @@ export const buildAgentGraph = async (
if (!isSequential && !finalResult) {
if (lastWorkerResult) finalResult = lastWorkerResult
else if (finalSummarization) finalResult = finalSummarization
if (socketIO && incomingInput.socketIOClientId) {
socketIO.to(incomingInput.socketIOClientId).emit('token', finalResult)
if (shouldStreamResponse && sseStreamer) {
sseStreamer.streamTokenEvent(chatId, finalResult)
}
}
@@ -377,16 +379,16 @@ export const buildAgentGraph = async (
{ type: 'reject-button', label: rejectButtonText }
]
}
if (socketIO && incomingInput.socketIOClientId) {
socketIO.to(incomingInput.socketIOClientId).emit('token', finalResult)
socketIO.to(incomingInput.socketIOClientId).emit('action', finalAction)
if (shouldStreamResponse && sseStreamer) {
sseStreamer.streamTokenEvent(chatId, finalResult)
sseStreamer.streamActionEvent(chatId, finalAction)
}
}
totalUsedTools.push(...mappedToolCalls)
} else if (lastAgentReasoningMessage) {
finalResult = lastAgentReasoningMessage
if (socketIO && incomingInput.socketIOClientId) {
socketIO.to(incomingInput.socketIOClientId).emit('token', finalResult)
if (shouldStreamResponse && sseStreamer) {
sseStreamer.streamTokenEvent(chatId, finalResult)
}
}
}
@@ -394,10 +396,10 @@ export const buildAgentGraph = async (
totalSourceDocuments = uniq(flatten(totalSourceDocuments))
totalUsedTools = uniq(flatten(totalUsedTools))
if (socketIO && incomingInput.socketIOClientId) {
socketIO.to(incomingInput.socketIOClientId).emit('usedTools', totalUsedTools)
socketIO.to(incomingInput.socketIOClientId).emit('sourceDocuments', totalSourceDocuments)
socketIO.to(incomingInput.socketIOClientId).emit('end')
if (shouldStreamResponse && sseStreamer) {
sseStreamer.streamUsedToolsEvent(chatId, totalUsedTools)
sseStreamer.streamSourceDocumentsEvent(chatId, totalSourceDocuments)
sseStreamer.streamEndEvent(chatId)
}
return {
@@ -412,8 +414,8 @@ export const buildAgentGraph = async (
// clear agent memory because checkpoints were saved during runtime
await clearSessionMemory(nodes, appServer.nodesPool.componentNodes, chatId, appServer.AppDataSource, sessionId)
if (getErrorMessage(e).includes('Aborted')) {
if (socketIO && incomingInput.socketIOClientId) {
socketIO.to(incomingInput.socketIOClientId).emit('abort')
if (shouldStreamResponse && sseStreamer) {
sseStreamer.streamAbortEvent(chatId)
}
return { finalResult, agentReasoning }
}
+32 -25
View File
@@ -5,7 +5,8 @@ import {
ICommonObject,
addSingleFileToStorage,
addArrayFilesToStorage,
mapMimeTypeToInputField
mapMimeTypeToInputField,
IServerSideEventStreamer
} from 'flowise-components'
import { StatusCodes } from 'http-status-codes'
import {
@@ -22,7 +23,6 @@ import {
} from '../Interface'
import { InternalFlowiseError } from '../errors/internalFlowiseError'
import { ChatFlow } from '../database/entities/ChatFlow'
import { Server } from 'socket.io'
import { getRunningExpressApp } from '../utils/getRunningExpressApp'
import {
isFlowValidForStream,
@@ -56,10 +56,9 @@ import { IAction } from 'flowise-components'
/**
* Build Chatflow
* @param {Request} req
* @param {Server} socketIO
* @param {boolean} isInternal
*/
export const utilBuildChatflow = async (req: Request, socketIO?: Server, isInternal: boolean = false): Promise<any> => {
export const utilBuildChatflow = async (req: Request, isInternal: boolean = false): Promise<any> => {
try {
const appServer = getRunningExpressApp()
const chatflowid = req.params.id
@@ -78,7 +77,6 @@ export const utilBuildChatflow = async (req: Request, socketIO?: Server, isInter
const chatId = incomingInput.chatId ?? incomingInput.overrideConfig?.sessionId ?? uuidv4()
const userMessageDateTime = new Date()
if (!isInternal) {
const isKeyValidated = await validateChatflowAPIKey(req, chatflow)
if (!isKeyValidated) {
@@ -161,8 +159,7 @@ export const utilBuildChatflow = async (req: Request, socketIO?: Server, isInter
}
incomingInput = {
question: req.body.question ?? 'hello',
overrideConfig,
socketIOClientId: req.body.socketIOClientId
overrideConfig
}
}
@@ -181,7 +178,6 @@ export const utilBuildChatflow = async (req: Request, socketIO?: Server, isInter
const { graph, nodeDependencies } = constructGraphs(nodes, edges)
const directedGraph = graph
const endingNodes = getEndingNodes(nodeDependencies, directedGraph, nodes)
/*** If the graph is an agent graph, build the agent response ***/
if (endingNodes.filter((node) => node.data.category === 'Multi Agents' || node.data.category === 'Sequential Agents').length) {
return await utilBuildAgentResponse(
@@ -195,8 +191,9 @@ export const utilBuildChatflow = async (req: Request, socketIO?: Server, isInter
incomingInput,
nodes,
edges,
socketIO,
baseURL
baseURL,
appServer.sseStreamer,
true
)
}
@@ -320,9 +317,7 @@ export const utilBuildChatflow = async (req: Request, socketIO?: Server, isInter
cachePool: appServer.cachePool,
isUpsert: false,
uploads: incomingInput.uploads,
baseURL,
socketIO,
socketIOClientId: incomingInput.socketIOClientId
baseURL
})
const nodeToExecute =
@@ -373,9 +368,9 @@ export const utilBuildChatflow = async (req: Request, socketIO?: Server, isInter
databaseEntities,
analytic: chatflow.analytic,
uploads: incomingInput.uploads,
socketIO,
socketIOClientId: incomingInput.socketIOClientId,
prependMessages
prependMessages,
sseStreamer: appServer.sseStreamer,
shouldStreamResponse: isStreamValid
})
: await nodeInstance.run(nodeToExecuteData, incomingInput.question, {
chatId,
@@ -442,6 +437,8 @@ export const utilBuildChatflow = async (req: Request, socketIO?: Server, isInter
result.question = incomingInput.question
result.chatId = chatId
result.chatMessageId = chatMessage?.id
result.isStreamValid = isStreamValid
if (sessionId) result.sessionId = sessionId
if (memoryType) result.memoryType = memoryType
@@ -467,12 +464,22 @@ const utilBuildAgentResponse = async (
incomingInput: IncomingInput,
nodes: IReactFlowNode[],
edges: IReactFlowEdge[],
socketIO?: Server,
baseURL?: string
baseURL?: string,
sseStreamer?: IServerSideEventStreamer,
shouldStreamResponse?: boolean
) => {
try {
const appServer = getRunningExpressApp()
const streamResults = await buildAgentGraph(agentflow, chatId, sessionId, incomingInput, isInternal, baseURL, socketIO)
const streamResults = await buildAgentGraph(
agentflow,
chatId,
sessionId,
incomingInput,
isInternal,
baseURL,
sseStreamer,
shouldStreamResponse
)
if (streamResults) {
const { finalResult, finalAction, sourceDocuments, usedTools, agentReasoning } = streamResults
const userMessage: Omit<IChatMessage, 'id'> = {
@@ -498,10 +505,10 @@ const utilBuildAgentResponse = async (
memoryType,
sessionId
}
if (sourceDocuments.length) apiMessage.sourceDocuments = JSON.stringify(sourceDocuments)
if (usedTools.length) apiMessage.usedTools = JSON.stringify(usedTools)
if (agentReasoning.length) apiMessage.agentReasoning = JSON.stringify(agentReasoning)
if (Object.keys(finalAction).length) apiMessage.action = JSON.stringify(finalAction)
if (sourceDocuments?.length) apiMessage.sourceDocuments = JSON.stringify(sourceDocuments)
if (usedTools?.length) apiMessage.usedTools = JSON.stringify(usedTools)
if (agentReasoning?.length) apiMessage.agentReasoning = JSON.stringify(agentReasoning)
if (finalAction && Object.keys(finalAction).length) apiMessage.action = JSON.stringify(finalAction)
const chatMessage = await utilAddChatMessage(apiMessage)
await appServer.telemetry.sendTelemetry('agentflow_prediction_sent', {
@@ -548,8 +555,8 @@ const utilBuildAgentResponse = async (
result.chatMessageId = chatMessage?.id
if (sessionId) result.sessionId = sessionId
if (memoryType) result.memoryType = memoryType
if (agentReasoning.length) result.agentReasoning = agentReasoning
if (Object.keys(finalAction).length) result.action = finalAction
if (agentReasoning?.length) result.agentReasoning = agentReasoning
if (finalAction && Object.keys(finalAction).length) result.action = finalAction
return result
}
+2 -11
View File
@@ -1,7 +1,6 @@
import path from 'path'
import fs from 'fs'
import logger from './logger'
import { Server } from 'socket.io'
import {
IComponentCredentials,
IComponentNodes,
@@ -436,8 +435,6 @@ type BuildFlowParams = {
stopNodeId?: string
uploads?: IFileUpload[]
baseURL?: string
socketIO?: Server
socketIOClientId?: string
}
/**
@@ -462,9 +459,7 @@ export const buildFlow = async ({
isUpsert,
stopNodeId,
uploads,
baseURL,
socketIO,
socketIOClientId
baseURL
}: BuildFlowParams) => {
const flowNodes = cloneDeep(reactFlowNodes)
@@ -533,9 +528,7 @@ export const buildFlow = async ({
cachePool,
dynamicVariables,
uploads,
baseURL,
socketIO,
socketIOClientId
baseURL
})
if (indexResult) upsertHistory['result'] = indexResult
logger.debug(`[server]: Finished upserting ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
@@ -561,8 +554,6 @@ export const buildFlow = async ({
dynamicVariables,
uploads,
baseURL,
socketIO,
socketIOClientId,
componentNodes: componentNodes as ICommonObject
})