mirror of
https://github.com/farcasclaudiu/Flowise.git
synced 2026-06-28 15:00:57 +03:00
Merge branch 'main' into chore/Upgrade-LC-version
# Conflicts: # packages/components/nodes/cache/RedisCache/RedisCache.ts # packages/components/nodes/cache/RedisCache/RedisEmbeddingsCache.ts # packages/components/nodes/chains/ConversationChain/ConversationChain.ts # packages/components/nodes/tools/RetrieverTool/RetrieverTool.ts # packages/components/nodes/vectorstores/Qdrant/Qdrant.ts # packages/components/nodes/vectorstores/Redis/Redis.ts # packages/components/nodes/vectorstores/Redis/RedisSearchBase.ts # packages/components/nodes/vectorstores/Redis/Redis_Existing.ts # packages/components/nodes/vectorstores/Redis/Redis_Upsert.ts # packages/components/src/agents.ts
This commit is contained in:
@@ -65,7 +65,7 @@ class OpenAIFunctionAgent_Agents implements INode {
|
||||
return prepareAgent(nodeData, { sessionId: this.sessionId, chatId: options.chatId, input }, options.chatHistory)
|
||||
}
|
||||
|
||||
async run(nodeData: INodeData, input: string, options: ICommonObject): Promise<string> {
|
||||
async run(nodeData: INodeData, input: string, options: ICommonObject): Promise<string | ICommonObject> {
|
||||
const memory = nodeData.inputs?.memory as FlowiseMemory
|
||||
const executor = prepareAgent(nodeData, { sessionId: this.sessionId, chatId: options.chatId, input }, options.chatHistory)
|
||||
|
||||
@@ -73,12 +73,20 @@ class OpenAIFunctionAgent_Agents implements INode {
|
||||
const callbacks = await additionalCallbacks(nodeData, options)
|
||||
|
||||
let res: ChainValues = {}
|
||||
let sourceDocuments: ICommonObject[] = []
|
||||
|
||||
if (options.socketIO && options.socketIOClientId) {
|
||||
const handler = new CustomChainHandler(options.socketIO, options.socketIOClientId)
|
||||
res = await executor.invoke({ input }, { callbacks: [loggerHandler, handler, ...callbacks] })
|
||||
if (res.sourceDocuments) {
|
||||
options.socketIO.to(options.socketIOClientId).emit('sourceDocuments', flatten(res.sourceDocuments))
|
||||
sourceDocuments = res.sourceDocuments
|
||||
}
|
||||
} else {
|
||||
res = await executor.invoke({ input }, { callbacks: [loggerHandler, ...callbacks] })
|
||||
if (res.sourceDocuments) {
|
||||
sourceDocuments = res.sourceDocuments
|
||||
}
|
||||
}
|
||||
|
||||
await memory.addChatMessages(
|
||||
@@ -95,7 +103,7 @@ class OpenAIFunctionAgent_Agents implements INode {
|
||||
this.sessionId
|
||||
)
|
||||
|
||||
return res?.output
|
||||
return sourceDocuments.length ? { text: res?.output, sourceDocuments: flatten(sourceDocuments) } : res?.output
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
+41
-4
@@ -1,10 +1,47 @@
|
||||
import { Redis } from 'ioredis'
|
||||
import { Redis, RedisOptions } from 'ioredis'
|
||||
import { isEqual } from 'lodash'
|
||||
import hash from 'object-hash'
|
||||
import { RedisCache as LangchainRedisCache } from '@langchain/community/caches/ioredis'
|
||||
import { StoredGeneration, mapStoredMessageToChatMessage } from '@langchain/core/messages'
|
||||
import { Generation, ChatGeneration } from '@langchain/core/outputs'
|
||||
import { getBaseClasses, getCredentialData, getCredentialParam, ICommonObject, INode, INodeData, INodeParams } from '../../../src'
|
||||
|
||||
let redisClientSingleton: Redis
|
||||
let redisClientOption: RedisOptions
|
||||
let redisClientUrl: string
|
||||
|
||||
const getRedisClientbyOption = (option: RedisOptions) => {
|
||||
if (!redisClientSingleton) {
|
||||
// if client doesn't exists
|
||||
redisClientSingleton = new Redis(option)
|
||||
redisClientOption = option
|
||||
return redisClientSingleton
|
||||
} else if (redisClientSingleton && !isEqual(option, redisClientOption)) {
|
||||
// if client exists but option changed
|
||||
redisClientSingleton.quit()
|
||||
redisClientSingleton = new Redis(option)
|
||||
redisClientOption = option
|
||||
return redisClientSingleton
|
||||
}
|
||||
return redisClientSingleton
|
||||
}
|
||||
|
||||
const getRedisClientbyUrl = (url: string) => {
|
||||
if (!redisClientSingleton) {
|
||||
// if client doesn't exists
|
||||
redisClientSingleton = new Redis(url)
|
||||
redisClientUrl = url
|
||||
return redisClientSingleton
|
||||
} else if (redisClientSingleton && url !== redisClientUrl) {
|
||||
// if client exists but option changed
|
||||
redisClientSingleton.quit()
|
||||
redisClientSingleton = new Redis(url)
|
||||
redisClientUrl = url
|
||||
return redisClientSingleton
|
||||
}
|
||||
return redisClientSingleton
|
||||
}
|
||||
|
||||
class RedisCache implements INode {
|
||||
label: string
|
||||
name: string
|
||||
@@ -61,7 +98,7 @@ class RedisCache implements INode {
|
||||
|
||||
const tlsOptions = sslEnabled === true ? { tls: { rejectUnauthorized: false } } : {}
|
||||
|
||||
client = new Redis({
|
||||
client = getRedisClientbyOption({
|
||||
port: portStr ? parseInt(portStr) : 6379,
|
||||
host,
|
||||
username,
|
||||
@@ -69,7 +106,7 @@ class RedisCache implements INode {
|
||||
...tlsOptions
|
||||
})
|
||||
} else {
|
||||
client = new Redis(redisUrl)
|
||||
client = getRedisClientbyUrl(redisUrl)
|
||||
}
|
||||
|
||||
const redisClient = new LangchainRedisCache(client)
|
||||
@@ -95,7 +132,7 @@ class RedisCache implements INode {
|
||||
for (let i = 0; i < value.length; i += 1) {
|
||||
const key = getCacheKey(prompt, llmKey, String(i))
|
||||
if (ttl) {
|
||||
await client.set(key, JSON.stringify(serializeGeneration(value[i])), 'EX', parseInt(ttl, 10))
|
||||
await client.set(key, JSON.stringify(serializeGeneration(value[i])), 'PX', parseInt(ttl, 10))
|
||||
} else {
|
||||
await client.set(key, JSON.stringify(serializeGeneration(value[i])))
|
||||
}
|
||||
|
||||
@@ -1,9 +1,46 @@
|
||||
import { Redis } from 'ioredis'
|
||||
import { Redis, RedisOptions } from 'ioredis'
|
||||
import { isEqual } from 'lodash'
|
||||
import { RedisByteStore } from '@langchain/community/storage/ioredis'
|
||||
import { Embeddings } from '@langchain/core/embeddings'
|
||||
import { CacheBackedEmbeddings } from 'langchain/embeddings/cache_backed'
|
||||
import { getBaseClasses, getCredentialData, getCredentialParam, ICommonObject, INode, INodeData, INodeParams } from '../../../src'
|
||||
|
||||
let redisClientSingleton: Redis
|
||||
let redisClientOption: RedisOptions
|
||||
let redisClientUrl: string
|
||||
|
||||
const getRedisClientbyOption = (option: RedisOptions) => {
|
||||
if (!redisClientSingleton) {
|
||||
// if client doesn't exists
|
||||
redisClientSingleton = new Redis(option)
|
||||
redisClientOption = option
|
||||
return redisClientSingleton
|
||||
} else if (redisClientSingleton && !isEqual(option, redisClientOption)) {
|
||||
// if client exists but option changed
|
||||
redisClientSingleton.quit()
|
||||
redisClientSingleton = new Redis(option)
|
||||
redisClientOption = option
|
||||
return redisClientSingleton
|
||||
}
|
||||
return redisClientSingleton
|
||||
}
|
||||
|
||||
const getRedisClientbyUrl = (url: string) => {
|
||||
if (!redisClientSingleton) {
|
||||
// if client doesn't exists
|
||||
redisClientSingleton = new Redis(url)
|
||||
redisClientUrl = url
|
||||
return redisClientSingleton
|
||||
} else if (redisClientSingleton && url !== redisClientUrl) {
|
||||
// if client exists but option changed
|
||||
redisClientSingleton.quit()
|
||||
redisClientSingleton = new Redis(url)
|
||||
redisClientUrl = url
|
||||
return redisClientSingleton
|
||||
}
|
||||
return redisClientSingleton
|
||||
}
|
||||
|
||||
class RedisEmbeddingsCache implements INode {
|
||||
label: string
|
||||
name: string
|
||||
@@ -75,7 +112,7 @@ class RedisEmbeddingsCache implements INode {
|
||||
|
||||
const tlsOptions = sslEnabled === true ? { tls: { rejectUnauthorized: false } } : {}
|
||||
|
||||
client = new Redis({
|
||||
client = getRedisClientbyOption({
|
||||
port: portStr ? parseInt(portStr) : 6379,
|
||||
host,
|
||||
username,
|
||||
@@ -83,7 +120,7 @@ class RedisEmbeddingsCache implements INode {
|
||||
...tlsOptions
|
||||
})
|
||||
} else {
|
||||
client = new Redis(redisUrl)
|
||||
client = getRedisClientbyUrl(redisUrl)
|
||||
}
|
||||
|
||||
ttl ??= '3600'
|
||||
|
||||
@@ -7,6 +7,8 @@ import { ConversationChain } from 'langchain/chains'
|
||||
import { FlowiseMemory, ICommonObject, IMessage, INode, INodeData, INodeParams } from '../../../src/Interface'
|
||||
import { ConsoleCallbackHandler, CustomChainHandler, additionalCallbacks } from '../../../src/handler'
|
||||
import { getBaseClasses, handleEscapeCharacters } from '../../../src/utils'
|
||||
import { checkInputs, Moderation, streamResponse } from '../../moderation/Moderation'
|
||||
import { formatResponse } from '../../outputparsers/OutputParserHelpers'
|
||||
|
||||
let systemMessage = `The following is a friendly conversation between a human and an AI. The AI is talkative and provides lots of specific details from its context. If the AI does not know the answer to a question, it truthfully says it does not know.`
|
||||
const inputKey = 'input'
|
||||
@@ -26,7 +28,7 @@ class ConversationChain_Chains implements INode {
|
||||
constructor(fields?: { sessionId?: string }) {
|
||||
this.label = 'Conversation Chain'
|
||||
this.name = 'conversationChain'
|
||||
this.version = 2.0
|
||||
this.version = 3.0
|
||||
this.type = 'ConversationChain'
|
||||
this.icon = 'conv.svg'
|
||||
this.category = 'Chains'
|
||||
@@ -60,6 +62,14 @@ class ConversationChain_Chains implements INode {
|
||||
optional: true,
|
||||
list: true
|
||||
},*/
|
||||
{
|
||||
label: 'Input Moderation',
|
||||
description: 'Detect text that could generate harmful output and prevent it from being sent to the language model',
|
||||
name: 'inputModeration',
|
||||
type: 'Moderation',
|
||||
optional: true,
|
||||
list: true
|
||||
},
|
||||
{
|
||||
label: 'System Message',
|
||||
name: 'systemMessagePrompt',
|
||||
@@ -80,8 +90,21 @@ class ConversationChain_Chains implements INode {
|
||||
return chain
|
||||
}
|
||||
|
||||
async run(nodeData: INodeData, input: string, options: ICommonObject): Promise<string> {
|
||||
async run(nodeData: INodeData, input: string, options: ICommonObject): Promise<string | object> {
|
||||
const memory = nodeData.inputs?.memory
|
||||
const moderations = nodeData.inputs?.inputModeration as Moderation[]
|
||||
|
||||
if (moderations && moderations.length > 0) {
|
||||
try {
|
||||
// Use the output of the moderation chain as input for the LLM chain
|
||||
input = await checkInputs(moderations, input)
|
||||
} catch (e) {
|
||||
await new Promise((resolve) => setTimeout(resolve, 500))
|
||||
streamResponse(options.socketIO && options.socketIOClientId, e.message, options.socketIO, options.socketIOClientId)
|
||||
return formatResponse(e.message)
|
||||
}
|
||||
}
|
||||
|
||||
const chain = prepareChain(nodeData, this.sessionId, options.chatHistory)
|
||||
|
||||
const loggerHandler = new ConsoleCallbackHandler(options.logger)
|
||||
|
||||
@@ -19,7 +19,7 @@ class ChatOpenAI_ChatModels implements INode {
|
||||
constructor() {
|
||||
this.label = 'ChatOpenAI'
|
||||
this.name = 'chatOpenAI'
|
||||
this.version = 2.0
|
||||
this.version = 3.0
|
||||
this.type = 'ChatOpenAI'
|
||||
this.icon = 'openai.svg'
|
||||
this.category = 'Chat Models'
|
||||
|
||||
@@ -20,7 +20,7 @@ class Airtable_DocumentLoaders implements INode {
|
||||
constructor() {
|
||||
this.label = 'Airtable'
|
||||
this.name = 'airtable'
|
||||
this.version = 2.0
|
||||
this.version = 3.0
|
||||
this.type = 'Document'
|
||||
this.icon = 'airtable.svg'
|
||||
this.category = 'Document Loaders'
|
||||
@@ -64,10 +64,21 @@ class Airtable_DocumentLoaders implements INode {
|
||||
'If your view URL looks like: https://airtable.com/app11RobdGoX0YNsC/tblJdmvbrgizbYICO/viw9UrP77Id0CE4ee, viw9UrP77Id0CE4ee is the view id',
|
||||
optional: true
|
||||
},
|
||||
{
|
||||
label: 'Include Only Fields',
|
||||
name: 'fields',
|
||||
type: 'string',
|
||||
placeholder: 'Name, Assignee, fld1u0qUz0SoOQ9Gg, fldew39v6LBN5CjUl',
|
||||
optional: true,
|
||||
additionalParams: true,
|
||||
description:
|
||||
'Comma-separated list of field names or IDs to include. If empty, then ALL fields are used. Use field IDs if field names contain commas.'
|
||||
},
|
||||
{
|
||||
label: 'Return All',
|
||||
name: 'returnAll',
|
||||
type: 'boolean',
|
||||
optional: true,
|
||||
default: true,
|
||||
additionalParams: true,
|
||||
description: 'If all results should be returned or only up to a given limit'
|
||||
@@ -76,9 +87,10 @@ class Airtable_DocumentLoaders implements INode {
|
||||
label: 'Limit',
|
||||
name: 'limit',
|
||||
type: 'number',
|
||||
optional: true,
|
||||
default: 100,
|
||||
additionalParams: true,
|
||||
description: 'Number of results to return'
|
||||
description: 'Number of results to return. Ignored when Return All is enabled.'
|
||||
},
|
||||
{
|
||||
label: 'Metadata',
|
||||
@@ -93,6 +105,8 @@ class Airtable_DocumentLoaders implements INode {
|
||||
const baseId = nodeData.inputs?.baseId as string
|
||||
const tableId = nodeData.inputs?.tableId as string
|
||||
const viewId = nodeData.inputs?.viewId as string
|
||||
const fieldsInput = nodeData.inputs?.fields as string
|
||||
const fields = fieldsInput ? fieldsInput.split(',').map((field) => field.trim()) : []
|
||||
const returnAll = nodeData.inputs?.returnAll as boolean
|
||||
const limit = nodeData.inputs?.limit as string
|
||||
const textSplitter = nodeData.inputs?.textSplitter as TextSplitter
|
||||
@@ -105,6 +119,7 @@ class Airtable_DocumentLoaders implements INode {
|
||||
baseId,
|
||||
tableId,
|
||||
viewId,
|
||||
fields,
|
||||
returnAll,
|
||||
accessToken,
|
||||
limit: limit ? parseInt(limit, 10) : 100
|
||||
@@ -112,6 +127,10 @@ class Airtable_DocumentLoaders implements INode {
|
||||
|
||||
const loader = new AirtableLoader(airtableOptions)
|
||||
|
||||
if (!baseId || !tableId) {
|
||||
throw new Error('Base ID and Table ID must be provided.')
|
||||
}
|
||||
|
||||
let docs = []
|
||||
|
||||
if (textSplitter) {
|
||||
@@ -145,10 +164,18 @@ interface AirtableLoaderParams {
|
||||
tableId: string
|
||||
accessToken: string
|
||||
viewId?: string
|
||||
fields?: string[]
|
||||
limit?: number
|
||||
returnAll?: boolean
|
||||
}
|
||||
|
||||
interface AirtableLoaderRequest {
|
||||
maxRecords?: number
|
||||
view: string | undefined
|
||||
fields?: string[]
|
||||
offset?: string
|
||||
}
|
||||
|
||||
interface AirtableLoaderResponse {
|
||||
records: AirtableLoaderPage[]
|
||||
offset?: string
|
||||
@@ -167,17 +194,20 @@ class AirtableLoader extends BaseDocumentLoader {
|
||||
|
||||
public readonly viewId?: string
|
||||
|
||||
public readonly fields: string[]
|
||||
|
||||
public readonly accessToken: string
|
||||
|
||||
public readonly limit: number
|
||||
|
||||
public readonly returnAll: boolean
|
||||
|
||||
constructor({ baseId, tableId, viewId, accessToken, limit = 100, returnAll = false }: AirtableLoaderParams) {
|
||||
constructor({ baseId, tableId, viewId, fields = [], accessToken, limit = 100, returnAll = false }: AirtableLoaderParams) {
|
||||
super()
|
||||
this.baseId = baseId
|
||||
this.tableId = tableId
|
||||
this.viewId = viewId
|
||||
this.fields = fields
|
||||
this.accessToken = accessToken
|
||||
this.limit = limit
|
||||
this.returnAll = returnAll
|
||||
@@ -190,17 +220,21 @@ class AirtableLoader extends BaseDocumentLoader {
|
||||
return this.loadLimit()
|
||||
}
|
||||
|
||||
protected async fetchAirtableData(url: string, params: ICommonObject): Promise<AirtableLoaderResponse> {
|
||||
protected async fetchAirtableData(url: string, data: AirtableLoaderRequest): Promise<AirtableLoaderResponse> {
|
||||
try {
|
||||
const headers = {
|
||||
Authorization: `Bearer ${this.accessToken}`,
|
||||
'Content-Type': 'application/json',
|
||||
Accept: 'application/json'
|
||||
}
|
||||
const response = await axios.get(url, { params, headers })
|
||||
const response = await axios.post(url, data, { headers })
|
||||
return response.data
|
||||
} catch (error) {
|
||||
throw new Error(`Failed to fetch ${url} from Airtable: ${error}`)
|
||||
if (axios.isAxiosError(error)) {
|
||||
throw new Error(`Failed to fetch ${url} from Airtable: ${error.message}, status: ${error.response?.status}`)
|
||||
} else {
|
||||
throw new Error(`Failed to fetch ${url} from Airtable: ${error}`)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -218,24 +252,53 @@ class AirtableLoader extends BaseDocumentLoader {
|
||||
}
|
||||
|
||||
private async loadLimit(): Promise<Document[]> {
|
||||
const params = { maxRecords: this.limit, view: this.viewId }
|
||||
const data = await this.fetchAirtableData(`https://api.airtable.com/v0/${this.baseId}/${this.tableId}`, params)
|
||||
if (data.records.length === 0) {
|
||||
return []
|
||||
let data: AirtableLoaderRequest = {
|
||||
maxRecords: this.limit,
|
||||
view: this.viewId
|
||||
}
|
||||
return data.records.map((page) => this.createDocumentFromPage(page))
|
||||
|
||||
if (this.fields.length > 0) {
|
||||
data.fields = this.fields
|
||||
}
|
||||
|
||||
let response: AirtableLoaderResponse
|
||||
let returnPages: AirtableLoaderPage[] = []
|
||||
|
||||
// Paginate if the user specifies a limit > 100 (like 200) but not return all.
|
||||
do {
|
||||
response = await this.fetchAirtableData(`https://api.airtable.com/v0/${this.baseId}/${this.tableId}/listRecords`, data)
|
||||
returnPages.push(...response.records)
|
||||
data.offset = response.offset
|
||||
|
||||
// Stop if we have fetched enough records
|
||||
if (returnPages.length >= this.limit) break
|
||||
} while (response.offset !== undefined)
|
||||
|
||||
// Truncate array to the limit if necessary
|
||||
if (returnPages.length > this.limit) {
|
||||
returnPages.length = this.limit
|
||||
}
|
||||
|
||||
return returnPages.map((page) => this.createDocumentFromPage(page))
|
||||
}
|
||||
|
||||
private async loadAll(): Promise<Document[]> {
|
||||
const params: ICommonObject = { pageSize: 100, view: this.viewId }
|
||||
let data: AirtableLoaderResponse
|
||||
let data: AirtableLoaderRequest = {
|
||||
view: this.viewId
|
||||
}
|
||||
|
||||
if (this.fields.length > 0) {
|
||||
data.fields = this.fields
|
||||
}
|
||||
|
||||
let response: AirtableLoaderResponse
|
||||
let returnPages: AirtableLoaderPage[] = []
|
||||
|
||||
do {
|
||||
data = await this.fetchAirtableData(`https://api.airtable.com/v0/${this.baseId}/${this.tableId}`, params)
|
||||
returnPages.push.apply(returnPages, data.records)
|
||||
params.offset = data.offset
|
||||
} while (data.offset !== undefined)
|
||||
response = await this.fetchAirtableData(`https://api.airtable.com/v0/${this.baseId}/${this.tableId}/listRecords`, data)
|
||||
returnPages.push(...response.records)
|
||||
data.offset = response.offset
|
||||
} while (response.offset !== undefined)
|
||||
return returnPages.map((page) => this.createDocumentFromPage(page))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -34,7 +34,7 @@ class AzureOpenAIEmbedding_Embeddings implements INode {
|
||||
label: 'Batch Size',
|
||||
name: 'batchSize',
|
||||
type: 'number',
|
||||
default: '1',
|
||||
default: '100',
|
||||
optional: true,
|
||||
additionalParams: true
|
||||
},
|
||||
|
||||
@@ -17,7 +17,7 @@ class OpenAIEmbedding_Embeddings implements INode {
|
||||
constructor() {
|
||||
this.label = 'OpenAI Embeddings'
|
||||
this.name = 'openAIEmbeddings'
|
||||
this.version = 1.0
|
||||
this.version = 2.0
|
||||
this.type = 'OpenAIEmbeddings'
|
||||
this.icon = 'openai.svg'
|
||||
this.category = 'Embeddings'
|
||||
@@ -30,6 +30,27 @@ class OpenAIEmbedding_Embeddings implements INode {
|
||||
credentialNames: ['openAIApi']
|
||||
}
|
||||
this.inputs = [
|
||||
{
|
||||
label: 'Model Name',
|
||||
name: 'modelName',
|
||||
type: 'options',
|
||||
options: [
|
||||
{
|
||||
label: 'text-embedding-3-large',
|
||||
name: 'text-embedding-3-large'
|
||||
},
|
||||
{
|
||||
label: 'text-embedding-3-small',
|
||||
name: 'text-embedding-3-small'
|
||||
},
|
||||
{
|
||||
label: 'text-embedding-ada-002',
|
||||
name: 'text-embedding-ada-002'
|
||||
}
|
||||
],
|
||||
default: 'text-embedding-ada-002',
|
||||
optional: true
|
||||
},
|
||||
{
|
||||
label: 'Strip New Lines',
|
||||
name: 'stripNewLines',
|
||||
@@ -66,12 +87,14 @@ class OpenAIEmbedding_Embeddings implements INode {
|
||||
const batchSize = nodeData.inputs?.batchSize as string
|
||||
const timeout = nodeData.inputs?.timeout as string
|
||||
const basePath = nodeData.inputs?.basepath as string
|
||||
const modelName = nodeData.inputs?.modelName as string
|
||||
|
||||
const credentialData = await getCredentialData(nodeData.credential ?? '', options)
|
||||
const openAIApiKey = getCredentialParam('openAIApiKey', credentialData, nodeData)
|
||||
|
||||
const obj: Partial<OpenAIEmbeddingsParams> & { openAIApiKey?: string } = {
|
||||
openAIApiKey
|
||||
openAIApiKey,
|
||||
modelName
|
||||
}
|
||||
|
||||
if (stripNewLines) obj.stripNewLines = stripNewLines
|
||||
|
||||
@@ -5,6 +5,24 @@ import { mapStoredMessageToChatMessage, AIMessage, HumanMessage, BaseMessage } f
|
||||
import { convertBaseMessagetoIMessage, getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils'
|
||||
import { FlowiseMemory, ICommonObject, IMessage, INode, INodeData, INodeParams, MemoryMethods, MessageType } from '../../../src/Interface'
|
||||
|
||||
let mongoClientSingleton: MongoClient
|
||||
let mongoUrl: string
|
||||
|
||||
const getMongoClient = async (newMongoUrl: string) => {
|
||||
if (!mongoClientSingleton) {
|
||||
// if client doesn't exists
|
||||
mongoClientSingleton = new MongoClient(newMongoUrl)
|
||||
mongoUrl = newMongoUrl
|
||||
return mongoClientSingleton
|
||||
} else if (mongoClientSingleton && newMongoUrl !== mongoUrl) {
|
||||
// if client exists but url changed
|
||||
mongoClientSingleton.close()
|
||||
mongoClientSingleton = new MongoClient(newMongoUrl)
|
||||
mongoUrl = newMongoUrl
|
||||
return mongoClientSingleton
|
||||
}
|
||||
return mongoClientSingleton
|
||||
}
|
||||
class MongoDB_Memory implements INode {
|
||||
label: string
|
||||
name: string
|
||||
@@ -79,9 +97,7 @@ const initializeMongoDB = async (nodeData: INodeData, options: ICommonObject): P
|
||||
const credentialData = await getCredentialData(nodeData.credential ?? '', options)
|
||||
const mongoDBConnectUrl = getCredentialParam('mongoDBConnectUrl', credentialData, nodeData)
|
||||
|
||||
const client = new MongoClient(mongoDBConnectUrl)
|
||||
await client.connect()
|
||||
|
||||
const client = await getMongoClient(mongoDBConnectUrl)
|
||||
const collection = client.db(databaseName).collection(collectionName)
|
||||
|
||||
const mongoDBChatMessageHistory = new MongoDBChatMessageHistory({
|
||||
|
||||
@@ -1,10 +1,47 @@
|
||||
import { Redis } from 'ioredis'
|
||||
import { Redis, RedisOptions } from 'ioredis'
|
||||
import { isEqual } from 'lodash'
|
||||
import { BufferMemory, BufferMemoryInput } from 'langchain/memory'
|
||||
import { RedisChatMessageHistory, RedisChatMessageHistoryInput } from '@langchain/community/stores/message/ioredis'
|
||||
import { mapStoredMessageToChatMessage, BaseMessage, AIMessage, HumanMessage } from '@langchain/core/messages'
|
||||
import { INode, INodeData, INodeParams, ICommonObject, MessageType, IMessage, MemoryMethods, FlowiseMemory } from '../../../src/Interface'
|
||||
import { convertBaseMessagetoIMessage, getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils'
|
||||
|
||||
let redisClientSingleton: Redis
|
||||
let redisClientOption: RedisOptions
|
||||
let redisClientUrl: string
|
||||
|
||||
const getRedisClientbyOption = (option: RedisOptions) => {
|
||||
if (!redisClientSingleton) {
|
||||
// if client doesn't exists
|
||||
redisClientSingleton = new Redis(option)
|
||||
redisClientOption = option
|
||||
return redisClientSingleton
|
||||
} else if (redisClientSingleton && !isEqual(option, redisClientOption)) {
|
||||
// if client exists but option changed
|
||||
redisClientSingleton.quit()
|
||||
redisClientSingleton = new Redis(option)
|
||||
redisClientOption = option
|
||||
return redisClientSingleton
|
||||
}
|
||||
return redisClientSingleton
|
||||
}
|
||||
|
||||
const getRedisClientbyUrl = (url: string) => {
|
||||
if (!redisClientSingleton) {
|
||||
// if client doesn't exists
|
||||
redisClientSingleton = new Redis(url)
|
||||
redisClientUrl = url
|
||||
return redisClientSingleton
|
||||
} else if (redisClientSingleton && url !== redisClientUrl) {
|
||||
// if client exists but option changed
|
||||
redisClientSingleton.quit()
|
||||
redisClientSingleton = new Redis(url)
|
||||
redisClientUrl = url
|
||||
return redisClientSingleton
|
||||
}
|
||||
return redisClientSingleton
|
||||
}
|
||||
|
||||
class RedisBackedChatMemory_Memory implements INode {
|
||||
label: string
|
||||
name: string
|
||||
@@ -95,7 +132,7 @@ const initalizeRedis = async (nodeData: INodeData, options: ICommonObject): Prom
|
||||
|
||||
const tlsOptions = sslEnabled === true ? { tls: { rejectUnauthorized: false } } : {}
|
||||
|
||||
client = new Redis({
|
||||
client = getRedisClientbyOption({
|
||||
port: portStr ? parseInt(portStr) : 6379,
|
||||
host,
|
||||
username,
|
||||
@@ -103,7 +140,7 @@ const initalizeRedis = async (nodeData: INodeData, options: ICommonObject): Prom
|
||||
...tlsOptions
|
||||
})
|
||||
} else {
|
||||
client = new Redis(redisUrl)
|
||||
client = getRedisClientbyUrl(redisUrl)
|
||||
}
|
||||
|
||||
let obj: RedisChatMessageHistoryInput = {
|
||||
@@ -120,24 +157,6 @@ const initalizeRedis = async (nodeData: INodeData, options: ICommonObject): Prom
|
||||
|
||||
const redisChatMessageHistory = new RedisChatMessageHistory(obj)
|
||||
|
||||
/*redisChatMessageHistory.getMessages = async (): Promise<BaseMessage[]> => {
|
||||
const rawStoredMessages = await client.lrange((redisChatMessageHistory as any).sessionId, windowSize ? -windowSize : 0, -1)
|
||||
const orderedMessages = rawStoredMessages.reverse().map((message) => JSON.parse(message))
|
||||
return orderedMessages.map(mapStoredMessageToChatMessage)
|
||||
}
|
||||
|
||||
redisChatMessageHistory.addMessage = async (message: BaseMessage): Promise<void> => {
|
||||
const messageToAdd = [message].map((msg) => msg.toDict())
|
||||
await client.lpush((redisChatMessageHistory as any).sessionId, JSON.stringify(messageToAdd[0]))
|
||||
if (sessionTTL) {
|
||||
await client.expire((redisChatMessageHistory as any).sessionId, sessionTTL)
|
||||
}
|
||||
}
|
||||
|
||||
redisChatMessageHistory.clear = async (): Promise<void> => {
|
||||
await client.del((redisChatMessageHistory as any).sessionId)
|
||||
}*/
|
||||
|
||||
const memory = new BufferMemoryExtended({
|
||||
memoryKey: memoryKey ?? 'chat_history',
|
||||
chatHistory: redisChatMessageHistory,
|
||||
|
||||
+8
-6
@@ -28,16 +28,17 @@ class CustomListOutputParser implements INode {
|
||||
label: 'Length',
|
||||
name: 'length',
|
||||
type: 'number',
|
||||
default: 5,
|
||||
step: 1,
|
||||
description: 'Number of values to return'
|
||||
description: 'Number of values to return',
|
||||
optional: true
|
||||
},
|
||||
{
|
||||
label: 'Separator',
|
||||
name: 'separator',
|
||||
type: 'string',
|
||||
description: 'Separator between values',
|
||||
default: ','
|
||||
default: ',',
|
||||
optional: true
|
||||
},
|
||||
{
|
||||
label: 'Autofix',
|
||||
@@ -53,10 +54,11 @@ class CustomListOutputParser implements INode {
|
||||
const separator = nodeData.inputs?.separator as string
|
||||
const lengthStr = nodeData.inputs?.length as string
|
||||
const autoFix = nodeData.inputs?.autofixParser as boolean
|
||||
let length = 5
|
||||
if (lengthStr) length = parseInt(lengthStr, 10)
|
||||
|
||||
const parser = new LangchainCustomListOutputParser({ length: length, separator: separator })
|
||||
const parser = new LangchainCustomListOutputParser({
|
||||
length: lengthStr ? parseInt(lengthStr, 10) : undefined,
|
||||
separator: separator
|
||||
})
|
||||
Object.defineProperty(parser, 'autoFix', {
|
||||
enumerable: true,
|
||||
configurable: true,
|
||||
|
||||
@@ -1,8 +1,11 @@
|
||||
import { z } from 'zod'
|
||||
import { DynamicStructuredTool } from '@langchain/core/tools'
|
||||
import { CallbackManagerForToolRun } from '@langchain/core/callbacks/manager'
|
||||
import { DynamicTool } from '@langchain/core/tools'
|
||||
import { BaseRetriever } from '@langchain/core/retrievers'
|
||||
import { createRetrieverTool } from 'langchain/tools/retriever'
|
||||
import { INode, INodeData, INodeParams } from '../../../src/Interface'
|
||||
import { getBaseClasses } from '../../../src/utils'
|
||||
import { SOURCE_DOCUMENTS_PREFIX } from '../../../src/agents'
|
||||
|
||||
class Retriever_Tools implements INode {
|
||||
label: string
|
||||
@@ -19,7 +22,7 @@ class Retriever_Tools implements INode {
|
||||
constructor() {
|
||||
this.label = 'Retriever Tool'
|
||||
this.name = 'retrieverTool'
|
||||
this.version = 1.0
|
||||
this.version = 2.0
|
||||
this.type = 'RetrieverTool'
|
||||
this.icon = 'retrievertool.svg'
|
||||
this.category = 'Tools'
|
||||
@@ -44,6 +47,12 @@ class Retriever_Tools implements INode {
|
||||
label: 'Retriever',
|
||||
name: 'retriever',
|
||||
type: 'BaseRetriever'
|
||||
},
|
||||
{
|
||||
label: 'Return Source Documents',
|
||||
name: 'returnSourceDocuments',
|
||||
type: 'boolean',
|
||||
optional: true
|
||||
}
|
||||
]
|
||||
}
|
||||
@@ -52,12 +61,25 @@ class Retriever_Tools implements INode {
|
||||
const name = nodeData.inputs?.name as string
|
||||
const description = nodeData.inputs?.description as string
|
||||
const retriever = nodeData.inputs?.retriever as BaseRetriever
|
||||
const returnSourceDocuments = nodeData.inputs?.returnSourceDocuments as boolean
|
||||
|
||||
const tool = createRetrieverTool(retriever, {
|
||||
const input = {
|
||||
name,
|
||||
description
|
||||
}
|
||||
|
||||
const func = async ({ input }: { input: string }, runManager?: CallbackManagerForToolRun) => {
|
||||
const docs = await retriever.getRelevantDocuments(input, runManager?.getChild('retriever'))
|
||||
const content = docs.map((doc) => doc.pageContent).join('\n\n')
|
||||
const sourceDocuments = JSON.stringify(docs)
|
||||
return returnSourceDocuments ? content + SOURCE_DOCUMENTS_PREFIX + sourceDocuments : content
|
||||
}
|
||||
|
||||
const schema = z.object({
|
||||
input: z.string().describe('query to look up in retriever')
|
||||
})
|
||||
|
||||
const tool = new DynamicStructuredTool({ ...input, func, schema })
|
||||
return tool
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { flatten } from 'lodash'
|
||||
import { createClient, SearchOptions } from 'redis'
|
||||
import { flatten, isEqual } from 'lodash'
|
||||
import { createClient, SearchOptions, RedisClientOptions } from 'redis'
|
||||
import { Embeddings } from '@langchain/core/embeddings'
|
||||
import { RedisVectorStore, RedisVectorStoreConfig } from '@langchain/community/vectorstores/redis'
|
||||
import { Document } from '@langchain/core/documents'
|
||||
@@ -7,6 +7,27 @@ import { ICommonObject, INode, INodeData, INodeOutputsValue, INodeParams } from
|
||||
import { getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils'
|
||||
import { escapeAllStrings, escapeSpecialChars, unEscapeSpecialChars } from './utils'
|
||||
|
||||
let redisClientSingleton: ReturnType<typeof createClient>
|
||||
let redisClientOption: RedisClientOptions
|
||||
|
||||
const getRedisClient = async (option: RedisClientOptions) => {
|
||||
if (!redisClientSingleton) {
|
||||
// if client doesn't exists
|
||||
redisClientSingleton = createClient(option)
|
||||
await redisClientSingleton.connect()
|
||||
redisClientOption = option
|
||||
return redisClientSingleton
|
||||
} else if (redisClientSingleton && !isEqual(option, redisClientOption)) {
|
||||
// if client exists but option changed
|
||||
redisClientSingleton.quit()
|
||||
redisClientSingleton = createClient(option)
|
||||
await redisClientSingleton.connect()
|
||||
redisClientOption = option
|
||||
return redisClientSingleton
|
||||
}
|
||||
return redisClientSingleton
|
||||
}
|
||||
|
||||
class Redis_VectorStores implements INode {
|
||||
label: string
|
||||
name: string
|
||||
@@ -149,8 +170,7 @@ class Redis_VectorStores implements INode {
|
||||
}
|
||||
|
||||
try {
|
||||
const redisClient = createClient({ url: redisUrl })
|
||||
await redisClient.connect()
|
||||
const redisClient = await getRedisClient({ url: redisUrl })
|
||||
|
||||
const storeConfig: RedisVectorStoreConfig = {
|
||||
redisClient: redisClient,
|
||||
@@ -210,8 +230,7 @@ class Redis_VectorStores implements INode {
|
||||
redisUrl = 'redis://' + username + ':' + password + '@' + host + ':' + portStr
|
||||
}
|
||||
|
||||
const redisClient = createClient({ url: redisUrl })
|
||||
await redisClient.connect()
|
||||
const redisClient = await getRedisClient({ url: redisUrl })
|
||||
|
||||
const storeConfig: RedisVectorStoreConfig = {
|
||||
redisClient: redisClient,
|
||||
|
||||
@@ -1,3 +1,10 @@
|
||||
import { createClient, SearchOptions, RedisClientOptions } from 'redis'
|
||||
import { isEqual } from 'lodash'
|
||||
import { Embeddings } from '@langchain/core/embeddings'
|
||||
import { VectorStore } from '@langchain/core/vectorstores'
|
||||
import { Document } from '@langchain/core/documents'
|
||||
import { RedisVectorStore } from '@langchain/community/vectorstores/redis'
|
||||
import { escapeSpecialChars, unEscapeSpecialChars } from './utils'
|
||||
import {
|
||||
getBaseClasses,
|
||||
getCredentialData,
|
||||
@@ -7,12 +14,27 @@ import {
|
||||
INodeOutputsValue,
|
||||
INodeParams
|
||||
} from '../../../src'
|
||||
import { Embeddings } from '@langchain/core/embeddings'
|
||||
import { VectorStore } from '@langchain/core/vectorstores'
|
||||
import { Document } from '@langchain/core/documents'
|
||||
import { createClient, SearchOptions } from 'redis'
|
||||
import { RedisVectorStore } from '@langchain/community/vectorstores/redis'
|
||||
import { escapeSpecialChars, unEscapeSpecialChars } from './utils'
|
||||
|
||||
let redisClientSingleton: ReturnType<typeof createClient>
|
||||
let redisClientOption: RedisClientOptions
|
||||
|
||||
const getRedisClient = async (option: RedisClientOptions) => {
|
||||
if (!redisClientSingleton) {
|
||||
// if client doesn't exists
|
||||
redisClientSingleton = createClient(option)
|
||||
await redisClientSingleton.connect()
|
||||
redisClientOption = option
|
||||
return redisClientSingleton
|
||||
} else if (redisClientSingleton && !isEqual(option, redisClientOption)) {
|
||||
// if client exists but option changed
|
||||
redisClientSingleton.quit()
|
||||
redisClientSingleton = createClient(option)
|
||||
await redisClientSingleton.connect()
|
||||
redisClientOption = option
|
||||
return redisClientSingleton
|
||||
}
|
||||
return redisClientSingleton
|
||||
}
|
||||
|
||||
export abstract class RedisSearchBase {
|
||||
label: string
|
||||
@@ -140,8 +162,7 @@ export abstract class RedisSearchBase {
|
||||
redisUrl = 'redis://' + username + ':' + password + '@' + host + ':' + portStr
|
||||
}
|
||||
|
||||
this.redisClient = createClient({ url: redisUrl })
|
||||
await this.redisClient.connect()
|
||||
this.redisClient = await getRedisClient({ url: redisUrl })
|
||||
|
||||
const vectorStore = await this.constructVectorStore(embeddings, indexName, replaceIndex, docs)
|
||||
if (!contentKey || contentKey === '') contentKey = 'content'
|
||||
|
||||
Reference in New Issue
Block a user