diff --git a/packages/components/nodes/agents/AirtableAgent/AirtableAgent.ts b/packages/components/nodes/agents/AirtableAgent/AirtableAgent.ts new file mode 100644 index 00000000..5d42d8be --- /dev/null +++ b/packages/components/nodes/agents/AirtableAgent/AirtableAgent.ts @@ -0,0 +1,234 @@ +import { ICommonObject, INode, INodeData, INodeParams, PromptTemplate } from '../../../src/Interface' +import { AgentExecutor } from 'langchain/agents' +import { getBaseClasses } from '../../../src/utils' +import { LoadPyodide, finalSystemPrompt, systemPrompt } from './core' +import { LLMChain } from 'langchain/chains' +import { BaseLanguageModel } from 'langchain/base_language' +import { ConsoleCallbackHandler, CustomChainHandler } from '../../../src/handler' +import axios from 'axios' + +class Airtable_Agents implements INode { + label: string + name: string + description: string + type: string + icon: string + category: string + baseClasses: string[] + inputs: INodeParams[] + + constructor() { + this.label = 'Airtable Agent' + this.name = 'airtableAgent' + this.type = 'AgentExecutor' + this.category = 'Agents' + this.icon = 'airtable.svg' + this.description = 'Agent used to to answer queries on Airtable table' + this.baseClasses = [this.type, ...getBaseClasses(AgentExecutor)] + this.inputs = [ + { + label: 'Language Model', + name: 'model', + type: 'BaseLanguageModel' + }, + { + label: 'Personal Access Token', + name: 'accessToken', + type: 'password', + description: + 'Get personal access token from official guide' + }, + { + label: 'Base Id', + name: 'baseId', + type: 'string', + placeholder: 'app11RobdGoX0YNsC', + description: + 'If your table URL looks like: https://airtable.com/app11RobdGoX0YNsC/tblJdmvbrgizbYICO/viw9UrP77Id0CE4ee, app11RovdGoX0YNsC is the base id' + }, + { + label: 'Table Id', + name: 'tableId', + type: 'string', + placeholder: 'tblJdmvbrgizbYICO', + description: + 'If your table URL looks like: https://airtable.com/app11RobdGoX0YNsC/tblJdmvbrgizbYICO/viw9UrP77Id0CE4ee, tblJdmvbrgizbYICO is the table id' + }, + { + label: 'Return All', + name: 'returnAll', + type: 'boolean', + default: true, + additionalParams: true, + description: 'If all results should be returned or only up to a given limit' + }, + { + label: 'Limit', + name: 'limit', + type: 'number', + default: 100, + step: 1, + additionalParams: true, + description: 'Number of results to return' + } + ] + } + + async init(): Promise { + // Not used + return undefined + } + + async run(nodeData: INodeData, input: string, options: ICommonObject): Promise { + const model = nodeData.inputs?.model as BaseLanguageModel + const accessToken = nodeData.inputs?.accessToken as string + const baseId = nodeData.inputs?.baseId as string + const tableId = nodeData.inputs?.tableId as string + const returnAll = nodeData.inputs?.returnAll as boolean + const limit = nodeData.inputs?.limit as string + + let airtableData: ICommonObject[] = [] + + if (returnAll) { + airtableData = await loadAll(baseId, tableId, accessToken) + } else { + airtableData = await loadLimit(limit ? parseInt(limit, 10) : 100, baseId, tableId, accessToken) + } + + let base64String = Buffer.from(JSON.stringify(airtableData)).toString('base64') + + const loggerHandler = new ConsoleCallbackHandler(options.logger) + const handler = new CustomChainHandler(options.socketIO, options.socketIOClientId) + + const pyodide = await LoadPyodide() + + // First load the csv file and get the dataframe dictionary of column types + // For example using titanic.csv: {'PassengerId': 'int64', 'Survived': 'int64', 'Pclass': 'int64', 'Name': 'object', 'Sex': 'object', 'Age': 'float64', 'SibSp': 'int64', 'Parch': 'int64', 'Ticket': 'object', 'Fare': 'float64', 'Cabin': 'object', 'Embarked': 'object'} + let dataframeColDict = '' + try { + const code = `import pandas as pd +import base64 +import json + +base64_string = "${base64String}" + +decoded_data = base64.b64decode(base64_string) + +json_data = json.loads(decoded_data) + +df = pd.DataFrame(json_data) +my_dict = df.dtypes.astype(str).to_dict() +print(my_dict) +json.dumps(my_dict)` + dataframeColDict = await pyodide.runPythonAsync(code) + } catch (error) { + throw new Error(error) + } + options.logger.debug('[components/AirtableAgent] [1] DataframeColDict =>', dataframeColDict) + + // Then tell GPT to come out with ONLY python code + // For example: len(df), df[df['SibSp'] > 3]['PassengerId'].count() + let pythonCode = '' + if (dataframeColDict) { + const chain = new LLMChain({ + llm: model, + prompt: PromptTemplate.fromTemplate(systemPrompt), + verbose: process.env.DEBUG === 'true' ? true : false + }) + const inputs = { + dict: dataframeColDict, + question: input + } + const res = await chain.call(inputs, [loggerHandler]) + pythonCode = res?.text + } + options.logger.debug('[components/AirtableAgent] [2] Generated Python Code =>', pythonCode) + + // Then run the code using Pyodide + let finalResult = '' + if (pythonCode) { + try { + const code = `import pandas as pd\n${pythonCode}` + finalResult = await pyodide.runPythonAsync(code) + } catch (error) { + throw new Error(`Sorry, I'm unable to find answer for question: "${input}" using follwoing code: "${pythonCode}"`) + } + } + options.logger.debug('[components/AirtableAgent] [3] Pyodide Result =>', finalResult) + + // Finally, return a complete answer + if (finalResult) { + const chain = new LLMChain({ + llm: model, + prompt: PromptTemplate.fromTemplate(finalSystemPrompt), + verbose: process.env.DEBUG === 'true' ? true : false + }) + const inputs = { + question: input, + answer: finalResult + } + + if (options.socketIO && options.socketIOClientId) { + const result = await chain.call(inputs, [loggerHandler, handler]) + options.logger.debug('[components/AirtableAgent] [4] Final Result =>', result?.text) + return result?.text + } else { + const result = await chain.call(inputs, [loggerHandler]) + options.logger.debug('[components/AirtableAgent] [4] Final Result =>', result?.text) + return result?.text + } + } + + return pythonCode + } +} + +interface AirtableLoaderResponse { + records: AirtableLoaderPage[] + offset?: string +} + +interface AirtableLoaderPage { + id: string + createdTime: string + fields: ICommonObject +} + +const fetchAirtableData = async (url: string, params: ICommonObject, accessToken: string): Promise => { + try { + const headers = { + Authorization: `Bearer ${accessToken}`, + 'Content-Type': 'application/json', + Accept: 'application/json' + } + const response = await axios.get(url, { params, headers }) + return response.data + } catch (error) { + throw new Error(`Failed to fetch ${url} from Airtable: ${error}`) + } +} + +const loadAll = async (baseId: string, tableId: string, accessToken: string): Promise => { + const params: ICommonObject = { pageSize: 100 } + let data: AirtableLoaderResponse + let returnPages: AirtableLoaderPage[] = [] + + do { + data = await fetchAirtableData(`https://api.airtable.com/v0/${baseId}/${tableId}`, params, accessToken) + returnPages.push.apply(returnPages, data.records) + params.offset = data.offset + } while (data.offset !== undefined) + + return data.records.map((page) => page.fields) +} + +const loadLimit = async (limit: number, baseId: string, tableId: string, accessToken: string): Promise => { + const params = { maxRecords: limit } + const data = await fetchAirtableData(`https://api.airtable.com/v0/${baseId}/${tableId}`, params, accessToken) + if (data.records.length === 0) { + return [] + } + return data.records.map((page) => page.fields) +} + +module.exports = { nodeClass: Airtable_Agents } diff --git a/packages/components/nodes/agents/AirtableAgent/airtable.svg b/packages/components/nodes/agents/AirtableAgent/airtable.svg new file mode 100644 index 00000000..867c3b5a --- /dev/null +++ b/packages/components/nodes/agents/AirtableAgent/airtable.svg @@ -0,0 +1,9 @@ + + + + + + + + + diff --git a/packages/components/nodes/agents/AirtableAgent/core.ts b/packages/components/nodes/agents/AirtableAgent/core.ts new file mode 100644 index 00000000..450bf5ea --- /dev/null +++ b/packages/components/nodes/agents/AirtableAgent/core.ts @@ -0,0 +1,29 @@ +import type { PyodideInterface } from 'pyodide' +import * as path from 'path' +import { getUserHome } from '../../../src/utils' + +let pyodideInstance: PyodideInterface | undefined + +export async function LoadPyodide(): Promise { + if (pyodideInstance === undefined) { + const { loadPyodide } = await import('pyodide') + const obj: any = { packageCacheDir: path.join(getUserHome(), '.flowise', 'pyodideCacheDir') } + pyodideInstance = await loadPyodide(obj) + await pyodideInstance.loadPackage(['pandas', 'numpy']) + } + + return pyodideInstance +} + +export const systemPrompt = `You are working with a pandas dataframe in Python. The name of the dataframe is df. + +The columns and data types of a dataframe are given below as a Python dictionary with keys showing column names and values showing the data types. +{dict} + +I will ask question, and you will output the Python code using pandas dataframe to answer my question. Do not provide any explanations. Do not respond with anything except the output of the code. + +Question: {question} +Output Code:` + +export const finalSystemPrompt = `You are given the question: {question}. You have an answer to the question: {answer}. Rephrase the answer into a standalone answer. +Standalone Answer:` diff --git a/packages/components/nodes/agents/CSVAgent/CSVAgent.ts b/packages/components/nodes/agents/CSVAgent/CSVAgent.ts index e93b52d3..dae90990 100644 --- a/packages/components/nodes/agents/CSVAgent/CSVAgent.ts +++ b/packages/components/nodes/agents/CSVAgent/CSVAgent.ts @@ -119,7 +119,7 @@ json.dumps(my_dict)` const code = `import pandas as pd\n${pythonCode}` finalResult = await pyodide.runPythonAsync(code) } catch (error) { - throw new Error(pythonCode) + throw new Error(`Sorry, I'm unable to find answer for question: "${input}" using follwoing code: "${pythonCode}"`) } } options.logger.debug('[components/CSVAgent] [3] Pyodide Result =>', finalResult) diff --git a/packages/components/nodes/agents/CSVAgent/core.ts b/packages/components/nodes/agents/CSVAgent/core.ts index 1c193091..450bf5ea 100644 --- a/packages/components/nodes/agents/CSVAgent/core.ts +++ b/packages/components/nodes/agents/CSVAgent/core.ts @@ -9,13 +9,7 @@ export async function LoadPyodide(): Promise { const { loadPyodide } = await import('pyodide') const obj: any = { packageCacheDir: path.join(getUserHome(), '.flowise', 'pyodideCacheDir') } pyodideInstance = await loadPyodide(obj) - await pyodideInstance.loadPackage('micropip') - const micropip = pyodideInstance.pyimport('micropip') - await micropip.install('pandas') - await micropip.install('numpy') - //let mountDir = "/mnt"; - //pyodideInstance.FS.mkdir(mountDir); - //pyodideInstance.FS.mount(pyodideInstance.FS.filesystems.NODEFS, { root: path.join(getUserHome(), '.flowise', 'pyodideFS') }, mountDir); + await pyodideInstance.loadPackage(['pandas', 'numpy']) } return pyodideInstance diff --git a/packages/server/src/utils/index.ts b/packages/server/src/utils/index.ts index 1da6a84e..5fa33cff 100644 --- a/packages/server/src/utils/index.ts +++ b/packages/server/src/utils/index.ts @@ -728,7 +728,7 @@ export const isFlowValidForStream = (reactFlowNodes: IReactFlowNode[], endingNod isValidChainOrAgent = !blacklistChains.includes(endingNodeData.name) } else if (endingNodeData.category === 'Agents') { // Agent that are available to stream - const whitelistAgents = ['openAIFunctionAgent', 'csvAgent'] + const whitelistAgents = ['openAIFunctionAgent', 'csvAgent', 'airtableAgent'] isValidChainOrAgent = whitelistAgents.includes(endingNodeData.name) }