mirror of
https://github.com/farcasclaudiu/Flowise.git
synced 2026-06-28 15:00:57 +03:00
S3 Directory Document Loading Component (#2818)
* Add new S3Directory component * Add Additional Metadata and Omit Metadata Keys parameters * Update S3Directory.ts add placeholder for prefix --------- Co-authored-by: Scott Laplante <scott.laplante@Scotts-MacBook-Pro.local> Co-authored-by: Henry Heng <henryheng@flowiseai.com>
This commit is contained in:
@@ -0,0 +1,321 @@
|
||||
import { omit } from 'lodash'
|
||||
import { ICommonObject, INode, INodeData, INodeOptionsValue, INodeParams } from '../../../src/Interface'
|
||||
import { getCredentialData, getCredentialParam } from '../../../src/utils'
|
||||
import { S3Client, GetObjectCommand, S3ClientConfig, ListObjectsV2Command, ListObjectsV2Output } from '@aws-sdk/client-s3'
|
||||
import { getRegions, MODEL_TYPE } from '../../../src/modelLoader'
|
||||
import { Readable } from 'node:stream'
|
||||
import * as fsDefault from 'node:fs'
|
||||
import * as path from 'node:path'
|
||||
import * as os from 'node:os'
|
||||
|
||||
import { DirectoryLoader } from 'langchain/document_loaders/fs/directory'
|
||||
import { JSONLoader } from 'langchain/document_loaders/fs/json'
|
||||
import { CSVLoader } from 'langchain/document_loaders/fs/csv'
|
||||
import { PDFLoader } from 'langchain/document_loaders/fs/pdf'
|
||||
import { DocxLoader } from 'langchain/document_loaders/fs/docx'
|
||||
import { TextLoader } from 'langchain/document_loaders/fs/text'
|
||||
import { TextSplitter } from 'langchain/text_splitter'
|
||||
|
||||
class S3_DocumentLoaders implements INode {
|
||||
label: string
|
||||
name: string
|
||||
version: number
|
||||
description: string
|
||||
type: string
|
||||
icon: string
|
||||
category: string
|
||||
baseClasses: string[]
|
||||
credential: INodeParams
|
||||
inputs?: INodeParams[]
|
||||
|
||||
constructor() {
|
||||
this.label = 'S3 Directory'
|
||||
this.name = 's3Directory'
|
||||
this.version = 3.0
|
||||
this.type = 'Document'
|
||||
this.icon = 's3.svg'
|
||||
this.category = 'Document Loaders'
|
||||
this.description = 'Load Data from S3 Buckets'
|
||||
this.baseClasses = [this.type]
|
||||
this.credential = {
|
||||
label: 'Credential',
|
||||
name: 'credential',
|
||||
type: 'credential',
|
||||
credentialNames: ['awsApi'],
|
||||
optional: true
|
||||
}
|
||||
this.inputs = [
|
||||
{
|
||||
label: 'Text Splitter',
|
||||
name: 'textSplitter',
|
||||
type: 'TextSplitter',
|
||||
optional: true
|
||||
},
|
||||
{
|
||||
label: 'Bucket',
|
||||
name: 'bucketName',
|
||||
type: 'string'
|
||||
},
|
||||
{
|
||||
label: 'Region',
|
||||
name: 'region',
|
||||
type: 'asyncOptions',
|
||||
loadMethod: 'listRegions',
|
||||
default: 'us-east-1'
|
||||
},
|
||||
{
|
||||
label: 'Server URL',
|
||||
name: 'serverUrl',
|
||||
description:
|
||||
'The fully qualified endpoint of the webservice. This is only for using a custom endpoint (for example, when using a local version of S3).',
|
||||
type: 'string',
|
||||
optional: true
|
||||
},
|
||||
{
|
||||
label: 'Prefix',
|
||||
name: 'prefix',
|
||||
type: 'string',
|
||||
description: 'Limits the response to keys that begin with the specified prefix',
|
||||
placeholder: 'TestFolder/Something',
|
||||
optional: true
|
||||
},
|
||||
{
|
||||
label: 'Pdf Usage',
|
||||
name: 'pdfUsage',
|
||||
type: 'options',
|
||||
options: [
|
||||
{
|
||||
label: 'One document per page',
|
||||
name: 'perPage'
|
||||
},
|
||||
{
|
||||
label: 'One document per file',
|
||||
name: 'perFile'
|
||||
}
|
||||
],
|
||||
default: 'perPage',
|
||||
optional: true,
|
||||
additionalParams: true
|
||||
},
|
||||
{
|
||||
label: 'Additional Metadata',
|
||||
name: 'metadata',
|
||||
type: 'json',
|
||||
description: 'Additional metadata to be added to the extracted documents',
|
||||
optional: true,
|
||||
additionalParams: true
|
||||
},
|
||||
{
|
||||
label: 'Omit Metadata Keys',
|
||||
name: 'omitMetadataKeys',
|
||||
type: 'string',
|
||||
rows: 4,
|
||||
description:
|
||||
'Each document loader comes with a default set of metadata keys that are extracted from the document. You can use this field to omit some of the default metadata keys. The value should be a list of keys, seperated by comma. Use * to omit all metadata keys execept the ones you specify in the Additional Metadata field',
|
||||
placeholder: 'key1, key2, key3.nestedKey1',
|
||||
optional: true,
|
||||
additionalParams: true
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
loadMethods = {
|
||||
async listRegions(): Promise<INodeOptionsValue[]> {
|
||||
return await getRegions(MODEL_TYPE.CHAT, 'awsChatBedrock')
|
||||
}
|
||||
}
|
||||
|
||||
async init(nodeData: INodeData, _: string, options: ICommonObject): Promise<any> {
|
||||
const textSplitter = nodeData.inputs?.textSplitter as TextSplitter
|
||||
const bucketName = nodeData.inputs?.bucketName as string
|
||||
const prefix = nodeData.inputs?.prefix as string
|
||||
const region = nodeData.inputs?.region as string
|
||||
const serverUrl = nodeData.inputs?.serverUrl as string
|
||||
const pdfUsage = nodeData.inputs?.pdfUsage
|
||||
const metadata = nodeData.inputs?.metadata
|
||||
const _omitMetadataKeys = nodeData.inputs?.omitMetadataKeys as string
|
||||
|
||||
let omitMetadataKeys: string[] = []
|
||||
if (_omitMetadataKeys) {
|
||||
omitMetadataKeys = _omitMetadataKeys.split(',').map((key) => key.trim())
|
||||
}
|
||||
|
||||
let credentials: S3ClientConfig['credentials'] | undefined
|
||||
|
||||
if (nodeData.credential) {
|
||||
const credentialData = await getCredentialData(nodeData.credential, options)
|
||||
const accessKeyId = getCredentialParam('awsKey', credentialData, nodeData)
|
||||
const secretAccessKey = getCredentialParam('awsSecret', credentialData, nodeData)
|
||||
|
||||
if (accessKeyId && secretAccessKey) {
|
||||
credentials = {
|
||||
accessKeyId,
|
||||
secretAccessKey
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let s3Config: S3ClientConfig = {
|
||||
region: region,
|
||||
credentials: credentials
|
||||
}
|
||||
|
||||
if (serverUrl) {
|
||||
s3Config = {
|
||||
region: region,
|
||||
credentials: credentials,
|
||||
endpoint: serverUrl,
|
||||
forcePathStyle: true
|
||||
}
|
||||
}
|
||||
|
||||
const tempDir = fsDefault.mkdtempSync(path.join(os.tmpdir(), 's3fileloader-'))
|
||||
|
||||
try {
|
||||
const s3Client = new S3Client(s3Config)
|
||||
|
||||
const listObjectsOutput: ListObjectsV2Output = await s3Client.send(
|
||||
new ListObjectsV2Command({
|
||||
Bucket: bucketName,
|
||||
Prefix: prefix
|
||||
})
|
||||
)
|
||||
|
||||
const keys: string[] = (listObjectsOutput?.Contents ?? []).filter((item) => item.Key && item.ETag).map((item) => item.Key!)
|
||||
|
||||
await Promise.all(
|
||||
keys.map(async (key) => {
|
||||
const filePath = path.join(tempDir, key)
|
||||
try {
|
||||
const response = await s3Client.send(
|
||||
new GetObjectCommand({
|
||||
Bucket: bucketName,
|
||||
Key: key
|
||||
})
|
||||
)
|
||||
|
||||
const objectData = await new Promise<Buffer>((resolve, reject) => {
|
||||
const chunks: Buffer[] = []
|
||||
|
||||
if (response.Body instanceof Readable) {
|
||||
response.Body.on('data', (chunk: Buffer) => chunks.push(chunk))
|
||||
response.Body.on('end', () => resolve(Buffer.concat(chunks)))
|
||||
response.Body.on('error', reject)
|
||||
} else {
|
||||
reject(new Error('Response body is not a readable stream.'))
|
||||
}
|
||||
})
|
||||
|
||||
// create the directory if it doesnt already exist
|
||||
fsDefault.mkdirSync(path.dirname(filePath), { recursive: true })
|
||||
|
||||
// write the file to the directory
|
||||
fsDefault.writeFileSync(filePath, objectData)
|
||||
} catch (e: any) {
|
||||
throw new Error(`Failed to download file ${key} from S3 bucket ${bucketName}: ${e.message}`)
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
const loader = new DirectoryLoader(
|
||||
tempDir,
|
||||
{
|
||||
'.json': (path) => new JSONLoader(path),
|
||||
'.txt': (path) => new TextLoader(path),
|
||||
'.csv': (path) => new CSVLoader(path),
|
||||
'.docx': (path) => new DocxLoader(path),
|
||||
'.pdf': (path) =>
|
||||
pdfUsage === 'perFile'
|
||||
? // @ts-ignore
|
||||
new PDFLoader(path, { splitPages: false, pdfjs: () => import('pdf-parse/lib/pdf.js/v1.10.100/build/pdf.js') })
|
||||
: // @ts-ignore
|
||||
new PDFLoader(path, { pdfjs: () => import('pdf-parse/lib/pdf.js/v1.10.100/build/pdf.js') }),
|
||||
'.aspx': (path) => new TextLoader(path),
|
||||
'.asp': (path) => new TextLoader(path),
|
||||
'.cpp': (path) => new TextLoader(path), // C++
|
||||
'.c': (path) => new TextLoader(path),
|
||||
'.cs': (path) => new TextLoader(path),
|
||||
'.css': (path) => new TextLoader(path),
|
||||
'.go': (path) => new TextLoader(path), // Go
|
||||
'.h': (path) => new TextLoader(path), // C++ Header files
|
||||
'.kt': (path) => new TextLoader(path), // Kotlin
|
||||
'.java': (path) => new TextLoader(path), // Java
|
||||
'.js': (path) => new TextLoader(path), // JavaScript
|
||||
'.less': (path) => new TextLoader(path), // Less files
|
||||
'.ts': (path) => new TextLoader(path), // TypeScript
|
||||
'.php': (path) => new TextLoader(path), // PHP
|
||||
'.proto': (path) => new TextLoader(path), // Protocol Buffers
|
||||
'.python': (path) => new TextLoader(path), // Python
|
||||
'.py': (path) => new TextLoader(path), // Python
|
||||
'.rst': (path) => new TextLoader(path), // reStructuredText
|
||||
'.ruby': (path) => new TextLoader(path), // Ruby
|
||||
'.rb': (path) => new TextLoader(path), // Ruby
|
||||
'.rs': (path) => new TextLoader(path), // Rust
|
||||
'.scala': (path) => new TextLoader(path), // Scala
|
||||
'.sc': (path) => new TextLoader(path), // Scala
|
||||
'.scss': (path) => new TextLoader(path), // Sass
|
||||
'.sol': (path) => new TextLoader(path), // Solidity
|
||||
'.sql': (path) => new TextLoader(path), //SQL
|
||||
'.swift': (path) => new TextLoader(path), // Swift
|
||||
'.markdown': (path) => new TextLoader(path), // Markdown
|
||||
'.md': (path) => new TextLoader(path), // Markdown
|
||||
'.tex': (path) => new TextLoader(path), // LaTeX
|
||||
'.ltx': (path) => new TextLoader(path), // LaTeX
|
||||
'.html': (path) => new TextLoader(path), // HTML
|
||||
'.vb': (path) => new TextLoader(path), // Visual Basic
|
||||
'.xml': (path) => new TextLoader(path) // XML
|
||||
},
|
||||
true
|
||||
)
|
||||
|
||||
let docs = []
|
||||
|
||||
if (textSplitter) {
|
||||
docs = await loader.loadAndSplit(textSplitter)
|
||||
} else {
|
||||
docs = await loader.load()
|
||||
}
|
||||
|
||||
if (metadata) {
|
||||
const parsedMetadata = typeof metadata === 'object' ? metadata : JSON.parse(metadata)
|
||||
docs = docs.map((doc) => ({
|
||||
...doc,
|
||||
metadata:
|
||||
_omitMetadataKeys === '*'
|
||||
? {
|
||||
...parsedMetadata
|
||||
}
|
||||
: omit(
|
||||
{
|
||||
...doc.metadata,
|
||||
...parsedMetadata
|
||||
},
|
||||
omitMetadataKeys
|
||||
)
|
||||
}))
|
||||
} else {
|
||||
docs = docs.map((doc) => ({
|
||||
...doc,
|
||||
metadata:
|
||||
_omitMetadataKeys === '*'
|
||||
? {}
|
||||
: omit(
|
||||
{
|
||||
...doc.metadata
|
||||
},
|
||||
omitMetadataKeys
|
||||
)
|
||||
}))
|
||||
}
|
||||
|
||||
// remove the temp directory before returning docs
|
||||
fsDefault.rmSync(tempDir, { recursive: true })
|
||||
|
||||
return docs
|
||||
} catch (e: any) {
|
||||
fsDefault.rmSync(tempDir, { recursive: true })
|
||||
throw new Error(`Failed to load data from bucket ${bucketName}: ${e.message}`)
|
||||
}
|
||||
}
|
||||
}
|
||||
module.exports = { nodeClass: S3_DocumentLoaders }
|
||||
@@ -0,0 +1,5 @@
|
||||
<svg width="32" height="32" viewBox="0 0 32 32" fill="none" xmlns="http://www.w3.org/2000/svg">
|
||||
<path d="M27 8C27 10.7614 22.0751 13 16 13C9.92487 13 5 10.7614 5 8C5 5.23858 9.92487 3 16 3C22.0751 3 27 5.23858 27 8Z" fill="#64A939"/>
|
||||
<path d="M27 8C27 10.7614 22.0751 13 16 13C9.92487 13 5 10.7614 5 8M27 8C27 5.23858 22.0751 3 16 3C9.92487 3 5 5.23858 5 8M27 8L23 26C22.5 27.5 19.866 29 16 29C12.134 29 9.5 27.5 9 26L5 8" stroke="#64A939" stroke-width="2" stroke-linecap="round"/>
|
||||
<path d="M16 18C24.5 23.5 27.8093 22.2627 28.5 19.5C29 17.5 27.8333 14.5 26 14" stroke="#64A939" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"/>
|
||||
</svg>
|
||||
|
After Width: | Height: | Size: 652 B |
Reference in New Issue
Block a user