mirror of
https://github.com/farcasclaudiu/Flowise.git
synced 2026-06-28 19:00:59 +03:00
add API authorization
This commit is contained in:
@@ -9,6 +9,7 @@ export interface IChatFlow {
|
||||
id: string
|
||||
name: string
|
||||
flowData: string
|
||||
apikeyid: string
|
||||
deployed: boolean
|
||||
updatedDate: Date
|
||||
createdDate: Date
|
||||
|
||||
@@ -13,6 +13,9 @@ export class ChatFlow implements IChatFlow {
|
||||
@Column()
|
||||
flowData: string
|
||||
|
||||
@Column({ nullable: true })
|
||||
apikeyid: string
|
||||
|
||||
@Column()
|
||||
deployed: boolean
|
||||
|
||||
|
||||
+146
-86
@@ -12,7 +12,12 @@ import {
|
||||
getEndingNode,
|
||||
constructGraphs,
|
||||
resolveVariables,
|
||||
isStartNodeDependOnInput
|
||||
isStartNodeDependOnInput,
|
||||
getAPIKeys,
|
||||
addAPIKey,
|
||||
updateAPIKey,
|
||||
deleteAPIKey,
|
||||
compareKeys
|
||||
} from './utils'
|
||||
import { cloneDeep } from 'lodash'
|
||||
import { getDataSource } from './DataSource'
|
||||
@@ -42,6 +47,9 @@ export class App {
|
||||
await this.nodesPool.initialize()
|
||||
|
||||
this.chatflowPool = new ChatflowPool()
|
||||
|
||||
// Initialize API keys
|
||||
await getAPIKeys()
|
||||
})
|
||||
.catch((err) => {
|
||||
console.error('❌[server]: Error during Data Source initialization:', err)
|
||||
@@ -195,93 +203,14 @@ export class App {
|
||||
// Prediction
|
||||
// ----------------------------------------
|
||||
|
||||
// Send input message and get prediction result
|
||||
// Send input message and get prediction result (External)
|
||||
this.app.post('/api/v1/prediction/:id', async (req: Request, res: Response) => {
|
||||
try {
|
||||
const chatflowid = req.params.id
|
||||
const incomingInput: IncomingInput = req.body
|
||||
await this.processPrediction(req, res)
|
||||
})
|
||||
|
||||
let nodeToExecuteData: INodeData
|
||||
|
||||
/* Check if:
|
||||
* - Node Data already exists in pool
|
||||
* - Still in sync (i.e the flow has not been modified since)
|
||||
* - Flow doesn't start with nodes that depend on incomingInput.question
|
||||
***/
|
||||
if (
|
||||
Object.prototype.hasOwnProperty.call(this.chatflowPool.activeChatflows, chatflowid) &&
|
||||
this.chatflowPool.activeChatflows[chatflowid].inSync &&
|
||||
!isStartNodeDependOnInput(this.chatflowPool.activeChatflows[chatflowid].startingNodes)
|
||||
) {
|
||||
nodeToExecuteData = this.chatflowPool.activeChatflows[chatflowid].endingNodeData
|
||||
} else {
|
||||
/*** Get chatflows and prepare data ***/
|
||||
const chatflow = await this.AppDataSource.getRepository(ChatFlow).findOneBy({
|
||||
id: chatflowid
|
||||
})
|
||||
if (!chatflow) return res.status(404).send(`Chatflow ${chatflowid} not found`)
|
||||
|
||||
const flowData = chatflow.flowData
|
||||
const parsedFlowData: IReactFlowObject = JSON.parse(flowData)
|
||||
const nodes = parsedFlowData.nodes
|
||||
const edges = parsedFlowData.edges
|
||||
|
||||
/*** Get Ending Node with Directed Graph ***/
|
||||
const { graph, nodeDependencies } = constructGraphs(nodes, edges)
|
||||
const directedGraph = graph
|
||||
const endingNodeId = getEndingNode(nodeDependencies, directedGraph)
|
||||
if (!endingNodeId) return res.status(500).send(`Ending node must be either a Chain or Agent`)
|
||||
|
||||
const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data
|
||||
if (!endingNodeData) return res.status(500).send(`Ending node must be either a Chain or Agent`)
|
||||
|
||||
if (
|
||||
endingNodeData.outputs &&
|
||||
Object.keys(endingNodeData.outputs).length &&
|
||||
!Object.values(endingNodeData.outputs).includes(endingNodeData.name)
|
||||
) {
|
||||
return res
|
||||
.status(500)
|
||||
.send(
|
||||
`Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction`
|
||||
)
|
||||
}
|
||||
|
||||
/*** Get Starting Nodes with Non-Directed Graph ***/
|
||||
const constructedObj = constructGraphs(nodes, edges, true)
|
||||
const nonDirectedGraph = constructedObj.graph
|
||||
const { startingNodeIds, depthQueue } = getStartingNodes(nonDirectedGraph, endingNodeId)
|
||||
|
||||
/*** BFS to traverse from Starting Nodes to Ending Node ***/
|
||||
const reactFlowNodes = await buildLangchain(
|
||||
startingNodeIds,
|
||||
nodes,
|
||||
graph,
|
||||
depthQueue,
|
||||
this.nodesPool.componentNodes,
|
||||
incomingInput.question
|
||||
)
|
||||
|
||||
const nodeToExecute = reactFlowNodes.find((node: IReactFlowNode) => node.id === endingNodeId)
|
||||
if (!nodeToExecute) return res.status(404).send(`Node ${endingNodeId} not found`)
|
||||
|
||||
const reactFlowNodeData: INodeData = resolveVariables(nodeToExecute.data, reactFlowNodes, incomingInput.question)
|
||||
nodeToExecuteData = reactFlowNodeData
|
||||
|
||||
const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.id))
|
||||
this.chatflowPool.add(chatflowid, nodeToExecuteData, startingNodes)
|
||||
}
|
||||
|
||||
const nodeInstanceFilePath = this.nodesPool.componentNodes[nodeToExecuteData.name].filePath as string
|
||||
const nodeModule = await import(nodeInstanceFilePath)
|
||||
const nodeInstance = new nodeModule.nodeClass()
|
||||
|
||||
const result = await nodeInstance.run(nodeToExecuteData, incomingInput.question, { chatHistory: incomingInput.history })
|
||||
|
||||
return res.json(result)
|
||||
} catch (e: any) {
|
||||
return res.status(500).send(e.message)
|
||||
}
|
||||
// Send input message and get prediction result (Internal)
|
||||
this.app.post('/api/v1/internal-prediction/:id', async (req: Request, res: Response) => {
|
||||
await this.processPrediction(req, res, true)
|
||||
})
|
||||
|
||||
// ----------------------------------------
|
||||
@@ -308,6 +237,34 @@ export class App {
|
||||
return res.json(templates)
|
||||
})
|
||||
|
||||
// ----------------------------------------
|
||||
// API Keys
|
||||
// ----------------------------------------
|
||||
|
||||
// Get api keys
|
||||
this.app.get('/api/v1/apikey', async (req: Request, res: Response) => {
|
||||
const keys = await getAPIKeys()
|
||||
return res.json(keys)
|
||||
})
|
||||
|
||||
// Add new api key
|
||||
this.app.post('/api/v1/apikey', async (req: Request, res: Response) => {
|
||||
const keys = await addAPIKey(req.body.keyName)
|
||||
return res.json(keys)
|
||||
})
|
||||
|
||||
// Update api key
|
||||
this.app.put('/api/v1/apikey/:id', async (req: Request, res: Response) => {
|
||||
const keys = await updateAPIKey(req.params.id, req.body.keyName)
|
||||
return res.json(keys)
|
||||
})
|
||||
|
||||
// Delete new api key
|
||||
this.app.delete('/api/v1/apikey/:id', async (req: Request, res: Response) => {
|
||||
const keys = await deleteAPIKey(req.params.id)
|
||||
return res.json(keys)
|
||||
})
|
||||
|
||||
// ----------------------------------------
|
||||
// Serve UI static
|
||||
// ----------------------------------------
|
||||
@@ -324,6 +281,109 @@ export class App {
|
||||
})
|
||||
}
|
||||
|
||||
async processPrediction(req: Request, res: Response, isInternal = false) {
|
||||
try {
|
||||
const chatflowid = req.params.id
|
||||
const incomingInput: IncomingInput = req.body
|
||||
|
||||
let nodeToExecuteData: INodeData
|
||||
|
||||
const chatflow = await this.AppDataSource.getRepository(ChatFlow).findOneBy({
|
||||
id: chatflowid
|
||||
})
|
||||
if (!chatflow) return res.status(404).send(`Chatflow ${chatflowid} not found`)
|
||||
|
||||
if (!isInternal) {
|
||||
const chatFlowApiKeyId = chatflow.apikeyid
|
||||
const authorizationHeader = (req.headers['Authorization'] as string) ?? (req.headers['authorization'] as string) ?? ''
|
||||
|
||||
if (chatFlowApiKeyId && !authorizationHeader) return res.status(401).send(`Unauthorized`)
|
||||
|
||||
const suppliedKey = authorizationHeader.split(`Bearer `).pop()
|
||||
if (chatFlowApiKeyId && suppliedKey) {
|
||||
const keys = await getAPIKeys()
|
||||
const apiSecret = keys.find((key) => key.id === chatFlowApiKeyId)?.apiSecret
|
||||
if (!compareKeys(apiSecret, suppliedKey)) return res.status(401).send(`Unauthorized`)
|
||||
}
|
||||
}
|
||||
|
||||
/* Check if:
|
||||
* - Node Data already exists in pool
|
||||
* - Still in sync (i.e the flow has not been modified since)
|
||||
* - Flow doesn't start with nodes that depend on incomingInput.question
|
||||
***/
|
||||
if (
|
||||
Object.prototype.hasOwnProperty.call(this.chatflowPool.activeChatflows, chatflowid) &&
|
||||
this.chatflowPool.activeChatflows[chatflowid].inSync &&
|
||||
!isStartNodeDependOnInput(this.chatflowPool.activeChatflows[chatflowid].startingNodes)
|
||||
) {
|
||||
nodeToExecuteData = this.chatflowPool.activeChatflows[chatflowid].endingNodeData
|
||||
} else {
|
||||
/*** Get chatflows and prepare data ***/
|
||||
|
||||
const flowData = chatflow.flowData
|
||||
const parsedFlowData: IReactFlowObject = JSON.parse(flowData)
|
||||
const nodes = parsedFlowData.nodes
|
||||
const edges = parsedFlowData.edges
|
||||
|
||||
/*** Get Ending Node with Directed Graph ***/
|
||||
const { graph, nodeDependencies } = constructGraphs(nodes, edges)
|
||||
const directedGraph = graph
|
||||
const endingNodeId = getEndingNode(nodeDependencies, directedGraph)
|
||||
if (!endingNodeId) return res.status(500).send(`Ending node must be either a Chain or Agent`)
|
||||
|
||||
const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data
|
||||
if (!endingNodeData) return res.status(500).send(`Ending node must be either a Chain or Agent`)
|
||||
|
||||
if (
|
||||
endingNodeData.outputs &&
|
||||
Object.keys(endingNodeData.outputs).length &&
|
||||
!Object.values(endingNodeData.outputs).includes(endingNodeData.name)
|
||||
) {
|
||||
return res
|
||||
.status(500)
|
||||
.send(
|
||||
`Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction`
|
||||
)
|
||||
}
|
||||
|
||||
/*** Get Starting Nodes with Non-Directed Graph ***/
|
||||
const constructedObj = constructGraphs(nodes, edges, true)
|
||||
const nonDirectedGraph = constructedObj.graph
|
||||
const { startingNodeIds, depthQueue } = getStartingNodes(nonDirectedGraph, endingNodeId)
|
||||
|
||||
/*** BFS to traverse from Starting Nodes to Ending Node ***/
|
||||
const reactFlowNodes = await buildLangchain(
|
||||
startingNodeIds,
|
||||
nodes,
|
||||
graph,
|
||||
depthQueue,
|
||||
this.nodesPool.componentNodes,
|
||||
incomingInput.question
|
||||
)
|
||||
|
||||
const nodeToExecute = reactFlowNodes.find((node: IReactFlowNode) => node.id === endingNodeId)
|
||||
if (!nodeToExecute) return res.status(404).send(`Node ${endingNodeId} not found`)
|
||||
|
||||
const reactFlowNodeData: INodeData = resolveVariables(nodeToExecute.data, reactFlowNodes, incomingInput.question)
|
||||
nodeToExecuteData = reactFlowNodeData
|
||||
|
||||
const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.id))
|
||||
this.chatflowPool.add(chatflowid, nodeToExecuteData, startingNodes)
|
||||
}
|
||||
|
||||
const nodeInstanceFilePath = this.nodesPool.componentNodes[nodeToExecuteData.name].filePath as string
|
||||
const nodeModule = await import(nodeInstanceFilePath)
|
||||
const nodeInstance = new nodeModule.nodeClass()
|
||||
|
||||
const result = await nodeInstance.run(nodeToExecuteData, incomingInput.question, { chatHistory: incomingInput.history })
|
||||
|
||||
return res.json(result)
|
||||
} catch (e: any) {
|
||||
return res.status(500).send(e.message)
|
||||
}
|
||||
}
|
||||
|
||||
async stopApp() {
|
||||
try {
|
||||
const removePromises: any[] = []
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import path from 'path'
|
||||
import fs from 'fs'
|
||||
import moment from 'moment'
|
||||
import {
|
||||
IComponentNodes,
|
||||
IDepthQueue,
|
||||
@@ -14,6 +15,7 @@ import {
|
||||
} from '../Interface'
|
||||
import { cloneDeep, get } from 'lodash'
|
||||
import { ICommonObject, getInputVariables } from 'flowise-components'
|
||||
import { scryptSync, randomBytes, timingSafeEqual } from 'crypto'
|
||||
|
||||
const QUESTION_VAR_PREFIX = 'question'
|
||||
|
||||
@@ -362,3 +364,119 @@ export const isStartNodeDependOnInput = (startingNodes: IReactFlowNode[]): boole
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the api key path
|
||||
* @returns {string}
|
||||
*/
|
||||
export const getAPIKeyPath = (): string => {
|
||||
return path.join(__dirname, '..', '..', 'api.json')
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate the api key
|
||||
* @returns {string}
|
||||
*/
|
||||
export const generateAPIKey = (): string => {
|
||||
const buffer = randomBytes(32)
|
||||
return buffer.toString('base64')
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate the secret key
|
||||
* @param {string} apiKey
|
||||
* @returns {string}
|
||||
*/
|
||||
export const generateSecretHash = (apiKey: string): string => {
|
||||
const salt = randomBytes(8).toString('hex')
|
||||
const buffer = scryptSync(apiKey, salt, 64) as Buffer
|
||||
return `${buffer.toString('hex')}.${salt}`
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify valid keys
|
||||
* @param {string} storedKey
|
||||
* @param {string} suppliedKey
|
||||
* @returns {boolean}
|
||||
*/
|
||||
export const compareKeys = (storedKey: string, suppliedKey: string): boolean => {
|
||||
const [hashedPassword, salt] = storedKey.split('.')
|
||||
const buffer = scryptSync(suppliedKey, salt, 64) as Buffer
|
||||
return timingSafeEqual(Buffer.from(hashedPassword, 'hex'), buffer)
|
||||
}
|
||||
|
||||
/**
|
||||
* Get API keys
|
||||
* @returns {Promise<ICommonObject[]>}
|
||||
*/
|
||||
export const getAPIKeys = async (): Promise<ICommonObject[]> => {
|
||||
try {
|
||||
const content = await fs.promises.readFile(getAPIKeyPath(), 'utf8')
|
||||
return JSON.parse(content)
|
||||
} catch (error) {
|
||||
const keyName = 'DefaultKey'
|
||||
const apiKey = generateAPIKey()
|
||||
const apiSecret = generateSecretHash(apiKey)
|
||||
const content = [
|
||||
{
|
||||
keyName,
|
||||
apiKey,
|
||||
apiSecret,
|
||||
createdAt: moment().format('DD-MMM-YY'),
|
||||
id: randomBytes(16).toString('hex')
|
||||
}
|
||||
]
|
||||
await fs.promises.writeFile(getAPIKeyPath(), JSON.stringify(content), 'utf8')
|
||||
return content
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add new API key
|
||||
* @param {string} keyName
|
||||
* @returns {Promise<ICommonObject[]>}
|
||||
*/
|
||||
export const addAPIKey = async (keyName: string): Promise<ICommonObject[]> => {
|
||||
const existingAPIKeys = await getAPIKeys()
|
||||
const apiKey = generateAPIKey()
|
||||
const apiSecret = generateSecretHash(apiKey)
|
||||
const content = [
|
||||
...existingAPIKeys,
|
||||
{
|
||||
keyName,
|
||||
apiKey,
|
||||
apiSecret,
|
||||
createdAt: moment().format('DD-MMM-YY'),
|
||||
id: randomBytes(16).toString('hex')
|
||||
}
|
||||
]
|
||||
await fs.promises.writeFile(getAPIKeyPath(), JSON.stringify(content), 'utf8')
|
||||
return content
|
||||
}
|
||||
|
||||
/**
|
||||
* Update existing API key
|
||||
* @param {string} keyIdToUpdate
|
||||
* @param {string} newKeyName
|
||||
* @returns {Promise<ICommonObject[]>}
|
||||
*/
|
||||
export const updateAPIKey = async (keyIdToUpdate: string, newKeyName: string): Promise<ICommonObject[]> => {
|
||||
const existingAPIKeys = await getAPIKeys()
|
||||
const keyIndex = existingAPIKeys.findIndex((key) => key.id === keyIdToUpdate)
|
||||
if (keyIndex < 0) return []
|
||||
existingAPIKeys[keyIndex].keyName = newKeyName
|
||||
await fs.promises.writeFile(getAPIKeyPath(), JSON.stringify(existingAPIKeys), 'utf8')
|
||||
return existingAPIKeys
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete API key
|
||||
* @param {string} keyIdToDelete
|
||||
* @returns {Promise<ICommonObject[]>}
|
||||
*/
|
||||
export const deleteAPIKey = async (keyIdToDelete: string): Promise<ICommonObject[]> => {
|
||||
const existingAPIKeys = await getAPIKeys()
|
||||
const result = existingAPIKeys.filter((key) => key.id !== keyIdToDelete)
|
||||
await fs.promises.writeFile(getAPIKeyPath(), JSON.stringify(result), 'utf8')
|
||||
return result
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user