Feature/Code Interpreter (#3183)

* Base changes for ServerSide Events (instead of socket.io)

* lint fixes

* adding of interface and separate methods for streaming events

* lint

* first draft, handles both internal and external prediction end points.

* lint fixes

* additional internal end point for streaming and associated changes

* return streamresponse as true to build agent flow

* 1) JSON formatting for internal events
2) other fixes

* 1) convert internal event to metadata to maintain consistency with external response

* fix action and metadata streaming

* fix for error when agent flow is aborted

* prevent subflows from streaming and other code cleanup

* prevent streaming from enclosed tools

* add fix for preventing chaintool streaming

* update lock file

* add open when hidden to sse

* Streaming errors

* Streaming errors

* add fix for showing error message

* add code interpreter

* add artifacts to view message dialog

* Update pnpm-lock.yaml

---------

Co-authored-by: Vinod Paidimarry <vinodkiran@outlook.in>
This commit is contained in:
Henry Heng
2024-09-17 08:44:56 +01:00
committed by GitHub
parent 26444ac3ae
commit b02f279e9d
21 changed files with 729 additions and 333 deletions
+1
View File
@@ -42,6 +42,7 @@ export interface IChatMessage {
fileAnnotations?: string
agentReasoning?: string
fileUploads?: string
artifacts?: string
chatType: string
chatId: string
memoryType?: string
@@ -209,6 +209,9 @@ const parseAPIResponse = (apiResponse: ChatMessage | ChatMessage[]): ChatMessage
if (parsedResponse.action) {
parsedResponse.action = JSON.parse(parsedResponse.action)
}
if (parsedResponse.artifacts) {
parsedResponse.artifacts = JSON.parse(parsedResponse.artifacts)
}
return parsedResponse
}
@@ -32,6 +32,9 @@ export class ChatMessage implements IChatMessage {
@Column({ nullable: true, type: 'text' })
fileUploads?: string
@Column({ nullable: true, type: 'text' })
artifacts?: string
@Column({ nullable: true, type: 'text' })
action?: string | null
@@ -0,0 +1,11 @@
import { MigrationInterface, QueryRunner } from 'typeorm'
export class AddArtifactsToChatMessage1726156258465 implements MigrationInterface {
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`ALTER TABLE "chat_message" ADD COLUMN "artifacts" TEXT;`)
}
public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`ALTER TABLE "chat_message" DROP COLUMN "artifacts";`)
}
}
@@ -23,6 +23,7 @@ import { AddAgentReasoningToChatMessage1714679514451 } from './1714679514451-Add
import { AddTypeToChatFlow1716300000000 } from './1716300000000-AddTypeToChatFlow'
import { AddApiKey1720230151480 } from './1720230151480-AddApiKey'
import { AddActionToChatMessage1721078251523 } from './1721078251523-AddActionToChatMessage'
import { AddArtifactsToChatMessage1726156258465 } from './1726156258465-AddArtifactsToChatMessage'
import { AddCustomTemplate1725629836652 } from './1725629836652-AddCustomTemplate'
export const sqliteMigrations = [
@@ -51,5 +52,6 @@ export const sqliteMigrations = [
AddVectorStoreConfigToDocStore1715861032479,
AddApiKey1720230151480,
AddActionToChatMessage1721078251523,
AddArtifactsToChatMessage1726156258465,
AddCustomTemplate1725629836652
]
+10
View File
@@ -97,6 +97,16 @@ export class SSEStreamer implements IServerSideEventStreamer {
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
}
}
streamArtifactsEvent(chatId: string, data: any) {
const client = this.clients[chatId]
if (client) {
const clientResponse = {
event: 'artifacts',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
}
}
streamUsedToolsEvent(chatId: string, data: any): void {
const client = this.clients[chatId]
if (client) {
@@ -154,6 +154,7 @@ export const buildAgentGraph = async (
let finalAction: IAction = {}
let totalSourceDocuments: IDocument[] = []
let totalUsedTools: IUsedTool[] = []
let totalArtifacts: ICommonObject[] = []
const workerNodes = reactFlowNodes.filter((node) => node.data.name === 'worker')
const supervisorNodes = reactFlowNodes.filter((node) => node.data.name === 'supervisor')
@@ -221,6 +222,9 @@ export const buildAgentGraph = async (
const sourceDocuments = output[agentName]?.messages
? output[agentName].messages.map((msg: BaseMessage) => msg.additional_kwargs?.sourceDocuments)
: []
const artifacts = output[agentName]?.messages
? output[agentName].messages.map((msg: BaseMessage) => msg.additional_kwargs?.artifacts)
: []
const messages = output[agentName]?.messages
? output[agentName].messages.map((msg: BaseMessage) => (typeof msg === 'string' ? msg : msg.content))
: []
@@ -240,6 +244,11 @@ export const buildAgentGraph = async (
if (cleanedDocs.length) totalSourceDocuments.push(...cleanedDocs)
}
if (artifacts && artifacts.length) {
const cleanedArtifacts = artifacts.filter((artifact: ICommonObject) => artifact)
if (cleanedArtifacts.length) totalArtifacts.push(...cleanedArtifacts)
}
/*
* Check if the next node is a condition node, if yes, then add the agent reasoning of the condition node
*/
@@ -273,6 +282,7 @@ export const buildAgentGraph = async (
instructions: output[agentName]?.instructions,
usedTools: flatten(usedTools) as IUsedTool[],
sourceDocuments: flatten(sourceDocuments) as Document[],
artifacts: flatten(artifacts) as ICommonObject[],
state,
nodeName: isSequential ? mapNameToLabel[agentName].nodeName : undefined,
nodeId
@@ -395,10 +405,12 @@ export const buildAgentGraph = async (
totalSourceDocuments = uniq(flatten(totalSourceDocuments))
totalUsedTools = uniq(flatten(totalUsedTools))
totalArtifacts = uniq(flatten(totalArtifacts))
if (shouldStreamResponse && sseStreamer) {
sseStreamer.streamUsedToolsEvent(chatId, totalUsedTools)
sseStreamer.streamSourceDocumentsEvent(chatId, totalSourceDocuments)
sseStreamer.streamArtifactsEvent(chatId, totalArtifacts)
sseStreamer.streamEndEvent(chatId)
}
@@ -406,6 +418,7 @@ export const buildAgentGraph = async (
finalResult,
finalAction,
sourceDocuments: totalSourceDocuments,
artifacts: totalArtifacts,
usedTools: totalUsedTools,
agentReasoning
}
+4 -1
View File
@@ -420,6 +420,8 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals
if (result?.sourceDocuments) apiMessage.sourceDocuments = JSON.stringify(result.sourceDocuments)
if (result?.usedTools) apiMessage.usedTools = JSON.stringify(result.usedTools)
if (result?.fileAnnotations) apiMessage.fileAnnotations = JSON.stringify(result.fileAnnotations)
if (result?.artifacts) apiMessage.artifacts = JSON.stringify(result.artifacts)
const chatMessage = await utilAddChatMessage(apiMessage)
logger.debug(`[server]: Finished running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`)
@@ -481,7 +483,7 @@ const utilBuildAgentResponse = async (
shouldStreamResponse
)
if (streamResults) {
const { finalResult, finalAction, sourceDocuments, usedTools, agentReasoning } = streamResults
const { finalResult, finalAction, sourceDocuments, artifacts, usedTools, agentReasoning } = streamResults
const userMessage: Omit<IChatMessage, 'id'> = {
role: 'userMessage',
content: incomingInput.question,
@@ -506,6 +508,7 @@ const utilBuildAgentResponse = async (
sessionId
}
if (sourceDocuments?.length) apiMessage.sourceDocuments = JSON.stringify(sourceDocuments)
if (artifacts?.length) apiMessage.artifacts = JSON.stringify(artifacts)
if (usedTools?.length) apiMessage.usedTools = JSON.stringify(usedTools)
if (agentReasoning?.length) apiMessage.agentReasoning = JSON.stringify(agentReasoning)
if (finalAction && Object.keys(finalAction).length) apiMessage.action = JSON.stringify(finalAction)