[Feature] improve CsvLoader & clean code (#3830)

* Improve CSV Loader

* Improve S3 Loaders

---------

Co-authored-by: Henry <hzj94@hotmail.com>
This commit is contained in:
Jérémy JOURDIN
2025-01-14 17:47:04 +01:00
committed by GitHub
parent cc87d85675
commit 24eb437bad
7 changed files with 230 additions and 207 deletions
@@ -1,7 +1,6 @@
import { omit } from 'lodash'
import { TextSplitter } from 'langchain/text_splitter'
import { CSVLoader } from '@langchain/community/document_loaders/fs/csv'
import { getFileFromStorage, handleEscapeCharacters } from '../../../src'
import { CSVLoader } from './CsvLoader'
import { getFileFromStorage, handleDocumentLoaderDocuments, handleDocumentLoaderMetadata, handleDocumentLoaderOutput } from '../../../src'
import { ICommonObject, IDocument, INode, INodeData, INodeOutputsValue, INodeParams } from '../../../src/Interface'
class Csv_DocumentLoaders implements INode {
@@ -19,7 +18,7 @@ class Csv_DocumentLoaders implements INode {
constructor() {
this.label = 'Csv File'
this.name = 'csvFile'
this.version = 2.0
this.version = 3.0
this.type = 'Document'
this.icon = 'csv.svg'
this.category = 'Document Loaders'
@@ -82,21 +81,11 @@ class Csv_DocumentLoaders implements INode {
]
}
async init(nodeData: INodeData, _: string, options: ICommonObject): Promise<any> {
const textSplitter = nodeData.inputs?.textSplitter as TextSplitter
getFiles(nodeData: INodeData) {
const csvFileBase64 = nodeData.inputs?.csvFile as string
const columnName = nodeData.inputs?.columnName as string
const metadata = nodeData.inputs?.metadata
const output = nodeData.outputs?.output as string
const _omitMetadataKeys = nodeData.inputs?.omitMetadataKeys as string
let omitMetadataKeys: string[] = []
if (_omitMetadataKeys) {
omitMetadataKeys = _omitMetadataKeys.split(',').map((key) => key.trim())
}
let docs: IDocument[] = []
let files: string[] = []
let fromStorage: boolean = true
if (csvFileBase64.startsWith('FILE-STORAGE::')) {
const fileName = csvFileBase64.replace('FILE-STORAGE::', '')
@@ -105,21 +94,6 @@ class Csv_DocumentLoaders implements INode {
} else {
files = [fileName]
}
const chatflowid = options.chatflowid
for (const file of files) {
if (!file) continue
const fileData = await getFileFromStorage(file, chatflowid)
const blob = new Blob([fileData])
const loader = new CSVLoader(blob, columnName.trim().length === 0 ? undefined : columnName.trim())
if (textSplitter) {
docs = await loader.load()
docs = await textSplitter.splitDocuments(docs)
} else {
docs.push(...(await loader.load()))
}
}
} else {
if (csvFileBase64.startsWith('[') && csvFileBase64.endsWith(']')) {
files = JSON.parse(csvFileBase64)
@@ -127,64 +101,49 @@ class Csv_DocumentLoaders implements INode {
files = [csvFileBase64]
}
for (const file of files) {
if (!file) continue
const splitDataURI = file.split(',')
splitDataURI.pop()
const bf = Buffer.from(splitDataURI.pop() || '', 'base64')
const blob = new Blob([bf])
const loader = new CSVLoader(blob, columnName.trim().length === 0 ? undefined : columnName.trim())
if (textSplitter) {
docs = await loader.load()
docs = await textSplitter.splitDocuments(docs)
} else {
docs.push(...(await loader.load()))
}
}
fromStorage = false
}
if (metadata) {
const parsedMetadata = typeof metadata === 'object' ? metadata : JSON.parse(metadata)
docs = docs.map((doc) => ({
...doc,
metadata:
_omitMetadataKeys === '*'
? {
...parsedMetadata
}
: omit(
{
...doc.metadata,
...parsedMetadata
},
omitMetadataKeys
)
}))
return { files, fromStorage }
}
async getFileData(file: string, { chatflowid }: { chatflowid: string }, fromStorage?: boolean) {
if (fromStorage) {
return getFileFromStorage(file, chatflowid)
} else {
docs = docs.map((doc) => ({
...doc,
metadata:
_omitMetadataKeys === '*'
? {}
: omit(
{
...doc.metadata
},
omitMetadataKeys
)
}))
const splitDataURI = file.split(',')
splitDataURI.pop()
return Buffer.from(splitDataURI.pop() || '', 'base64')
}
}
async init(nodeData: INodeData, _: string, options: ICommonObject): Promise<any> {
const textSplitter = nodeData.inputs?.textSplitter as TextSplitter
const columnName = nodeData.inputs?.columnName as string
const metadata = nodeData.inputs?.metadata
const output = nodeData.outputs?.output as string
const _omitMetadataKeys = nodeData.inputs?.omitMetadataKeys as string
let docs: IDocument[] = []
const chatflowid = options.chatflowid
const { files, fromStorage } = this.getFiles(nodeData)
for (const file of files) {
if (!file) continue
const fileData = await this.getFileData(file, { chatflowid }, fromStorage)
const blob = new Blob([fileData])
const loader = new CSVLoader(blob, columnName.trim().length === 0 ? undefined : columnName.trim())
// use spread instead of push, because it raises RangeError: Maximum call stack size exceeded when too many docs
docs = [...docs, ...(await handleDocumentLoaderDocuments(loader, textSplitter))]
}
if (output === 'document') {
return docs
} else {
let finaltext = ''
for (const doc of docs) {
finaltext += `${doc.pageContent}\n`
}
return handleEscapeCharacters(finaltext, false)
}
docs = handleDocumentLoaderMetadata(docs, _omitMetadataKeys, metadata)
return handleDocumentLoaderOutput(docs, output)
}
}
@@ -0,0 +1,74 @@
import { TextLoader } from 'langchain/document_loaders/fs/text'
import Papa from 'papaparse'
type CSVLoaderOptions = {
// Return specifific column from key (string) or index (integer)
column?: string | number
// Force separator (default: auto detect)
separator?: string
}
/**
* A class that extends the TextLoader class. It represents a document
* loader that loads documents from a CSV file. It has a constructor that
* takes a `filePathOrBlob` parameter representing the path to the CSV
* file or a Blob object, and an optional `options` parameter of type
* `CSVLoaderOptions` or a string representing the column to use as the
* document's pageContent.
*/
export class CSVLoader extends TextLoader {
protected options: CSVLoaderOptions = {}
constructor(filePathOrBlob: ConstructorParameters<typeof TextLoader>[0], options?: CSVLoaderOptions | string) {
super(filePathOrBlob)
if (typeof options === 'string') {
this.options = { column: options }
} else {
this.options = options ?? this.options
}
}
/**
* A protected method that parses the raw CSV data and returns an array of
* strings representing the pageContent of each document. It uses the
* `papaparse` to parse the CSV data. If
* the `column` option is specified, it checks if the column exists in the
* CSV file and returns the values of that column as the pageContent. If
* the `column` option is not specified, it converts each row of the CSV
* data into key/value pairs and joins them with newline characters.
* @param raw The raw CSV data to be parsed.
* @returns An array of strings representing the pageContent of each document.
*/
async parse(raw: string): Promise<string[]> {
const { column, separator } = this.options
const {
data: parsed,
meta: { fields = [] }
} = Papa.parse<{ [K: string]: string }>(raw.trim(), {
delimiter: separator,
header: true
})
if (column !== undefined) {
if (!fields.length) {
throw new Error(`Unable to resolve fields from header.`)
}
let searchIdx = column
if (typeof column == 'number') {
searchIdx = fields[column]
}
if (!fields.includes(searchIdx as string)) {
throw new Error(`Column ${column} not found in CSV file.`)
}
// Note TextLoader will raise an exception if the value is null.
return parsed.map((row) => row[searchIdx])
}
return parsed.map((row) => fields.map((key) => `${key.trim() || '_0'}: ${row[key]?.trim()}`).join('\n'))
}
}
@@ -1,6 +1,11 @@
import { omit } from 'lodash'
import { ICommonObject, INode, INodeData, INodeOptionsValue, INodeOutputsValue, INodeParams } from '../../../src/Interface'
import { getCredentialData, getCredentialParam, handleEscapeCharacters } from '../../../src/utils'
import {
getCredentialData,
getCredentialParam,
handleDocumentLoaderDocuments,
handleDocumentLoaderMetadata,
handleDocumentLoaderOutput
} from '../../../src/utils'
import { S3Client, GetObjectCommand, S3ClientConfig, ListObjectsV2Command, ListObjectsV2Output } from '@aws-sdk/client-s3'
import { getRegions, MODEL_TYPE } from '../../../src/modelLoader'
import { Readable } from 'node:stream'
@@ -10,12 +15,13 @@ import * as os from 'node:os'
import { DirectoryLoader } from 'langchain/document_loaders/fs/directory'
import { JSONLoader } from 'langchain/document_loaders/fs/json'
import { CSVLoader } from '@langchain/community/document_loaders/fs/csv'
import { PDFLoader } from '@langchain/community/document_loaders/fs/pdf'
import { DocxLoader } from '@langchain/community/document_loaders/fs/docx'
import { TextLoader } from 'langchain/document_loaders/fs/text'
import { TextSplitter } from 'langchain/text_splitter'
import { CSVLoader } from '../Csv/CsvLoader'
class S3_DocumentLoaders implements INode {
label: string
name: string
@@ -151,11 +157,6 @@ class S3_DocumentLoaders implements INode {
const _omitMetadataKeys = nodeData.inputs?.omitMetadataKeys as string
const output = nodeData.outputs?.output as string
let omitMetadataKeys: string[] = []
if (_omitMetadataKeys) {
omitMetadataKeys = _omitMetadataKeys.split(',').map((key) => key.trim())
}
let credentials: S3ClientConfig['credentials'] | undefined
if (nodeData.credential) {
@@ -241,11 +242,11 @@ class S3_DocumentLoaders implements INode {
'.csv': (path) => new CSVLoader(path),
'.docx': (path) => new DocxLoader(path),
'.pdf': (path) =>
pdfUsage === 'perFile'
? // @ts-ignore
new PDFLoader(path, { splitPages: false, pdfjs: () => import('pdf-parse/lib/pdf.js/v1.10.100/build/pdf.js') })
: // @ts-ignore
new PDFLoader(path, { pdfjs: () => import('pdf-parse/lib/pdf.js/v1.10.100/build/pdf.js') }),
new PDFLoader(path, {
splitPages: pdfUsage !== 'perFile',
// @ts-ignore
pdfjs: () => import('pdf-parse/lib/pdf.js/v1.10.100/build/pdf.js')
}),
'.aspx': (path) => new TextLoader(path),
'.asp': (path) => new TextLoader(path),
'.cpp': (path) => new TextLoader(path), // C++
@@ -284,63 +285,16 @@ class S3_DocumentLoaders implements INode {
true
)
let docs = []
let docs = await handleDocumentLoaderDocuments(loader, textSplitter)
if (textSplitter) {
let splittedDocs = await loader.load()
splittedDocs = await textSplitter.splitDocuments(splittedDocs)
docs.push(...splittedDocs)
} else {
docs = await loader.load()
}
if (metadata) {
const parsedMetadata = typeof metadata === 'object' ? metadata : JSON.parse(metadata)
docs = docs.map((doc) => ({
...doc,
metadata:
_omitMetadataKeys === '*'
? {
...parsedMetadata
}
: omit(
{
...doc.metadata,
...parsedMetadata
},
omitMetadataKeys
)
}))
} else {
docs = docs.map((doc) => ({
...doc,
metadata:
_omitMetadataKeys === '*'
? {}
: omit(
{
...doc.metadata
},
omitMetadataKeys
)
}))
}
docs = handleDocumentLoaderMetadata(docs, _omitMetadataKeys, metadata)
return handleDocumentLoaderOutput(docs, output)
} catch (e: any) {
throw new Error(`Failed to load data from bucket ${bucketName}: ${e.message}`)
} finally {
// remove the temp directory before returning docs
fsDefault.rmSync(tempDir, { recursive: true })
if (output === 'document') {
return docs
} else {
let finaltext = ''
for (const doc of docs) {
finaltext += `${doc.pageContent}\n`
}
return handleEscapeCharacters(finaltext, false)
}
} catch (e: any) {
fsDefault.rmSync(tempDir, { recursive: true })
throw new Error(`Failed to load data from bucket ${bucketName}: ${e.message}`)
}
}
}
@@ -1,4 +1,3 @@
import { omit } from 'lodash'
import { ICommonObject, INode, INodeData, INodeOptionsValue, INodeOutputsValue, INodeParams } from '../../../src/Interface'
import { S3Loader } from '@langchain/community/document_loaders/web/s3'
import {
@@ -8,7 +7,13 @@ import {
SkipInferTableTypes,
HiResModelName
} from '@langchain/community/document_loaders/fs/unstructured'
import { getCredentialData, getCredentialParam, handleEscapeCharacters } from '../../../src/utils'
import {
getCredentialData,
getCredentialParam,
handleDocumentLoaderDocuments,
handleDocumentLoaderMetadata,
handleDocumentLoaderOutput
} from '../../../src/utils'
import { S3Client, GetObjectCommand, S3ClientConfig } from '@aws-sdk/client-s3'
import { getRegions, MODEL_TYPE } from '../../../src/modelLoader'
import { Readable } from 'node:stream'
@@ -483,11 +488,6 @@ class S3_DocumentLoaders implements INode {
const _omitMetadataKeys = nodeData.inputs?.omitMetadataKeys as string
const output = nodeData.outputs?.output as string
let omitMetadataKeys: string[] = []
if (_omitMetadataKeys) {
omitMetadataKeys = _omitMetadataKeys.split(',').map((key) => key.trim())
}
let credentials: S3ClientConfig['credentials'] | undefined
if (nodeData.credential) {
@@ -572,56 +572,15 @@ class S3_DocumentLoaders implements INode {
const unstructuredLoader = new UnstructuredLoader(filePath, obj)
let docs = await unstructuredLoader.load()
let docs = await handleDocumentLoaderDocuments(unstructuredLoader)
if (metadata) {
const parsedMetadata = typeof metadata === 'object' ? metadata : JSON.parse(metadata)
docs = docs.map((doc) => ({
...doc,
metadata:
_omitMetadataKeys === '*'
? {
...parsedMetadata
}
: omit(
{
...doc.metadata,
...parsedMetadata,
[sourceIdKey]: doc.metadata[sourceIdKey] || sourceIdKey
},
omitMetadataKeys
)
}))
} else {
docs = docs.map((doc) => ({
...doc,
metadata:
_omitMetadataKeys === '*'
? {}
: omit(
{
...doc.metadata,
[sourceIdKey]: doc.metadata[sourceIdKey] || sourceIdKey
},
omitMetadataKeys
)
}))
}
docs = handleDocumentLoaderMetadata(docs, _omitMetadataKeys, metadata, sourceIdKey)
fsDefault.rmSync(path.dirname(filePath), { recursive: true })
if (output === 'document') {
return docs
} else {
let finaltext = ''
for (const doc of docs) {
finaltext += `${doc.pageContent}\n`
}
return handleEscapeCharacters(finaltext, false)
}
return handleDocumentLoaderOutput(docs, output)
} catch {
fsDefault.rmSync(path.dirname(filePath), { recursive: true })
throw new Error(`Failed to load file ${filePath} using unstructured loader.`)
} finally {
fsDefault.rmSync(path.dirname(filePath), { recursive: true })
}
}