diff --git a/packages/components/nodes/documentloaders/Csv/Csv.ts b/packages/components/nodes/documentloaders/Csv/Csv.ts index ec7d1ef9..10186cf2 100644 --- a/packages/components/nodes/documentloaders/Csv/Csv.ts +++ b/packages/components/nodes/documentloaders/Csv/Csv.ts @@ -1,7 +1,6 @@ -import { omit } from 'lodash' import { TextSplitter } from 'langchain/text_splitter' -import { CSVLoader } from '@langchain/community/document_loaders/fs/csv' -import { getFileFromStorage, handleEscapeCharacters } from '../../../src' +import { CSVLoader } from './CsvLoader' +import { getFileFromStorage, handleDocumentLoaderDocuments, handleDocumentLoaderMetadata, handleDocumentLoaderOutput } from '../../../src' import { ICommonObject, IDocument, INode, INodeData, INodeOutputsValue, INodeParams } from '../../../src/Interface' class Csv_DocumentLoaders implements INode { @@ -19,7 +18,7 @@ class Csv_DocumentLoaders implements INode { constructor() { this.label = 'Csv File' this.name = 'csvFile' - this.version = 2.0 + this.version = 3.0 this.type = 'Document' this.icon = 'csv.svg' this.category = 'Document Loaders' @@ -82,21 +81,11 @@ class Csv_DocumentLoaders implements INode { ] } - async init(nodeData: INodeData, _: string, options: ICommonObject): Promise { - const textSplitter = nodeData.inputs?.textSplitter as TextSplitter + getFiles(nodeData: INodeData) { const csvFileBase64 = nodeData.inputs?.csvFile as string - const columnName = nodeData.inputs?.columnName as string - const metadata = nodeData.inputs?.metadata - const output = nodeData.outputs?.output as string - const _omitMetadataKeys = nodeData.inputs?.omitMetadataKeys as string - let omitMetadataKeys: string[] = [] - if (_omitMetadataKeys) { - omitMetadataKeys = _omitMetadataKeys.split(',').map((key) => key.trim()) - } - - let docs: IDocument[] = [] let files: string[] = [] + let fromStorage: boolean = true if (csvFileBase64.startsWith('FILE-STORAGE::')) { const fileName = csvFileBase64.replace('FILE-STORAGE::', '') @@ -105,21 +94,6 @@ class Csv_DocumentLoaders implements INode { } else { files = [fileName] } - const chatflowid = options.chatflowid - - for (const file of files) { - if (!file) continue - const fileData = await getFileFromStorage(file, chatflowid) - const blob = new Blob([fileData]) - const loader = new CSVLoader(blob, columnName.trim().length === 0 ? undefined : columnName.trim()) - - if (textSplitter) { - docs = await loader.load() - docs = await textSplitter.splitDocuments(docs) - } else { - docs.push(...(await loader.load())) - } - } } else { if (csvFileBase64.startsWith('[') && csvFileBase64.endsWith(']')) { files = JSON.parse(csvFileBase64) @@ -127,64 +101,49 @@ class Csv_DocumentLoaders implements INode { files = [csvFileBase64] } - for (const file of files) { - if (!file) continue - const splitDataURI = file.split(',') - splitDataURI.pop() - const bf = Buffer.from(splitDataURI.pop() || '', 'base64') - const blob = new Blob([bf]) - const loader = new CSVLoader(blob, columnName.trim().length === 0 ? undefined : columnName.trim()) - - if (textSplitter) { - docs = await loader.load() - docs = await textSplitter.splitDocuments(docs) - } else { - docs.push(...(await loader.load())) - } - } + fromStorage = false } - if (metadata) { - const parsedMetadata = typeof metadata === 'object' ? metadata : JSON.parse(metadata) - docs = docs.map((doc) => ({ - ...doc, - metadata: - _omitMetadataKeys === '*' - ? { - ...parsedMetadata - } - : omit( - { - ...doc.metadata, - ...parsedMetadata - }, - omitMetadataKeys - ) - })) + return { files, fromStorage } + } + + async getFileData(file: string, { chatflowid }: { chatflowid: string }, fromStorage?: boolean) { + if (fromStorage) { + return getFileFromStorage(file, chatflowid) } else { - docs = docs.map((doc) => ({ - ...doc, - metadata: - _omitMetadataKeys === '*' - ? {} - : omit( - { - ...doc.metadata - }, - omitMetadataKeys - ) - })) + const splitDataURI = file.split(',') + splitDataURI.pop() + return Buffer.from(splitDataURI.pop() || '', 'base64') + } + } + + async init(nodeData: INodeData, _: string, options: ICommonObject): Promise { + const textSplitter = nodeData.inputs?.textSplitter as TextSplitter + const columnName = nodeData.inputs?.columnName as string + const metadata = nodeData.inputs?.metadata + const output = nodeData.outputs?.output as string + const _omitMetadataKeys = nodeData.inputs?.omitMetadataKeys as string + + let docs: IDocument[] = [] + + const chatflowid = options.chatflowid + + const { files, fromStorage } = this.getFiles(nodeData) + + for (const file of files) { + if (!file) continue + + const fileData = await this.getFileData(file, { chatflowid }, fromStorage) + const blob = new Blob([fileData]) + const loader = new CSVLoader(blob, columnName.trim().length === 0 ? undefined : columnName.trim()) + + // use spread instead of push, because it raises RangeError: Maximum call stack size exceeded when too many docs + docs = [...docs, ...(await handleDocumentLoaderDocuments(loader, textSplitter))] } - if (output === 'document') { - return docs - } else { - let finaltext = '' - for (const doc of docs) { - finaltext += `${doc.pageContent}\n` - } - return handleEscapeCharacters(finaltext, false) - } + docs = handleDocumentLoaderMetadata(docs, _omitMetadataKeys, metadata) + + return handleDocumentLoaderOutput(docs, output) } } diff --git a/packages/components/nodes/documentloaders/Csv/CsvLoader.ts b/packages/components/nodes/documentloaders/Csv/CsvLoader.ts new file mode 100644 index 00000000..35d4a1fc --- /dev/null +++ b/packages/components/nodes/documentloaders/Csv/CsvLoader.ts @@ -0,0 +1,74 @@ +import { TextLoader } from 'langchain/document_loaders/fs/text' +import Papa from 'papaparse' + +type CSVLoaderOptions = { + // Return specifific column from key (string) or index (integer) + column?: string | number + // Force separator (default: auto detect) + separator?: string +} + +/** + * A class that extends the TextLoader class. It represents a document + * loader that loads documents from a CSV file. It has a constructor that + * takes a `filePathOrBlob` parameter representing the path to the CSV + * file or a Blob object, and an optional `options` parameter of type + * `CSVLoaderOptions` or a string representing the column to use as the + * document's pageContent. + */ +export class CSVLoader extends TextLoader { + protected options: CSVLoaderOptions = {} + + constructor(filePathOrBlob: ConstructorParameters[0], options?: CSVLoaderOptions | string) { + super(filePathOrBlob) + + if (typeof options === 'string') { + this.options = { column: options } + } else { + this.options = options ?? this.options + } + } + /** + * A protected method that parses the raw CSV data and returns an array of + * strings representing the pageContent of each document. It uses the + * `papaparse` to parse the CSV data. If + * the `column` option is specified, it checks if the column exists in the + * CSV file and returns the values of that column as the pageContent. If + * the `column` option is not specified, it converts each row of the CSV + * data into key/value pairs and joins them with newline characters. + * @param raw The raw CSV data to be parsed. + * @returns An array of strings representing the pageContent of each document. + */ + async parse(raw: string): Promise { + const { column, separator } = this.options + + const { + data: parsed, + meta: { fields = [] } + } = Papa.parse<{ [K: string]: string }>(raw.trim(), { + delimiter: separator, + header: true + }) + + if (column !== undefined) { + if (!fields.length) { + throw new Error(`Unable to resolve fields from header.`) + } + + let searchIdx = column + + if (typeof column == 'number') { + searchIdx = fields[column] + } + + if (!fields.includes(searchIdx as string)) { + throw new Error(`Column ${column} not found in CSV file.`) + } + + // Note TextLoader will raise an exception if the value is null. + return parsed.map((row) => row[searchIdx]) + } + + return parsed.map((row) => fields.map((key) => `${key.trim() || '_0'}: ${row[key]?.trim()}`).join('\n')) + } +} diff --git a/packages/components/nodes/documentloaders/S3Directory/S3Directory.ts b/packages/components/nodes/documentloaders/S3Directory/S3Directory.ts index 238b8312..072822ae 100644 --- a/packages/components/nodes/documentloaders/S3Directory/S3Directory.ts +++ b/packages/components/nodes/documentloaders/S3Directory/S3Directory.ts @@ -1,6 +1,11 @@ -import { omit } from 'lodash' import { ICommonObject, INode, INodeData, INodeOptionsValue, INodeOutputsValue, INodeParams } from '../../../src/Interface' -import { getCredentialData, getCredentialParam, handleEscapeCharacters } from '../../../src/utils' +import { + getCredentialData, + getCredentialParam, + handleDocumentLoaderDocuments, + handleDocumentLoaderMetadata, + handleDocumentLoaderOutput +} from '../../../src/utils' import { S3Client, GetObjectCommand, S3ClientConfig, ListObjectsV2Command, ListObjectsV2Output } from '@aws-sdk/client-s3' import { getRegions, MODEL_TYPE } from '../../../src/modelLoader' import { Readable } from 'node:stream' @@ -10,12 +15,13 @@ import * as os from 'node:os' import { DirectoryLoader } from 'langchain/document_loaders/fs/directory' import { JSONLoader } from 'langchain/document_loaders/fs/json' -import { CSVLoader } from '@langchain/community/document_loaders/fs/csv' import { PDFLoader } from '@langchain/community/document_loaders/fs/pdf' import { DocxLoader } from '@langchain/community/document_loaders/fs/docx' import { TextLoader } from 'langchain/document_loaders/fs/text' import { TextSplitter } from 'langchain/text_splitter' +import { CSVLoader } from '../Csv/CsvLoader' + class S3_DocumentLoaders implements INode { label: string name: string @@ -151,11 +157,6 @@ class S3_DocumentLoaders implements INode { const _omitMetadataKeys = nodeData.inputs?.omitMetadataKeys as string const output = nodeData.outputs?.output as string - let omitMetadataKeys: string[] = [] - if (_omitMetadataKeys) { - omitMetadataKeys = _omitMetadataKeys.split(',').map((key) => key.trim()) - } - let credentials: S3ClientConfig['credentials'] | undefined if (nodeData.credential) { @@ -241,11 +242,11 @@ class S3_DocumentLoaders implements INode { '.csv': (path) => new CSVLoader(path), '.docx': (path) => new DocxLoader(path), '.pdf': (path) => - pdfUsage === 'perFile' - ? // @ts-ignore - new PDFLoader(path, { splitPages: false, pdfjs: () => import('pdf-parse/lib/pdf.js/v1.10.100/build/pdf.js') }) - : // @ts-ignore - new PDFLoader(path, { pdfjs: () => import('pdf-parse/lib/pdf.js/v1.10.100/build/pdf.js') }), + new PDFLoader(path, { + splitPages: pdfUsage !== 'perFile', + // @ts-ignore + pdfjs: () => import('pdf-parse/lib/pdf.js/v1.10.100/build/pdf.js') + }), '.aspx': (path) => new TextLoader(path), '.asp': (path) => new TextLoader(path), '.cpp': (path) => new TextLoader(path), // C++ @@ -284,63 +285,16 @@ class S3_DocumentLoaders implements INode { true ) - let docs = [] + let docs = await handleDocumentLoaderDocuments(loader, textSplitter) - if (textSplitter) { - let splittedDocs = await loader.load() - splittedDocs = await textSplitter.splitDocuments(splittedDocs) - docs.push(...splittedDocs) - } else { - docs = await loader.load() - } - - if (metadata) { - const parsedMetadata = typeof metadata === 'object' ? metadata : JSON.parse(metadata) - docs = docs.map((doc) => ({ - ...doc, - metadata: - _omitMetadataKeys === '*' - ? { - ...parsedMetadata - } - : omit( - { - ...doc.metadata, - ...parsedMetadata - }, - omitMetadataKeys - ) - })) - } else { - docs = docs.map((doc) => ({ - ...doc, - metadata: - _omitMetadataKeys === '*' - ? {} - : omit( - { - ...doc.metadata - }, - omitMetadataKeys - ) - })) - } + docs = handleDocumentLoaderMetadata(docs, _omitMetadataKeys, metadata) + return handleDocumentLoaderOutput(docs, output) + } catch (e: any) { + throw new Error(`Failed to load data from bucket ${bucketName}: ${e.message}`) + } finally { // remove the temp directory before returning docs fsDefault.rmSync(tempDir, { recursive: true }) - - if (output === 'document') { - return docs - } else { - let finaltext = '' - for (const doc of docs) { - finaltext += `${doc.pageContent}\n` - } - return handleEscapeCharacters(finaltext, false) - } - } catch (e: any) { - fsDefault.rmSync(tempDir, { recursive: true }) - throw new Error(`Failed to load data from bucket ${bucketName}: ${e.message}`) } } } diff --git a/packages/components/nodes/documentloaders/S3File/S3File.ts b/packages/components/nodes/documentloaders/S3File/S3File.ts index 6b656de0..51c19804 100644 --- a/packages/components/nodes/documentloaders/S3File/S3File.ts +++ b/packages/components/nodes/documentloaders/S3File/S3File.ts @@ -1,4 +1,3 @@ -import { omit } from 'lodash' import { ICommonObject, INode, INodeData, INodeOptionsValue, INodeOutputsValue, INodeParams } from '../../../src/Interface' import { S3Loader } from '@langchain/community/document_loaders/web/s3' import { @@ -8,7 +7,13 @@ import { SkipInferTableTypes, HiResModelName } from '@langchain/community/document_loaders/fs/unstructured' -import { getCredentialData, getCredentialParam, handleEscapeCharacters } from '../../../src/utils' +import { + getCredentialData, + getCredentialParam, + handleDocumentLoaderDocuments, + handleDocumentLoaderMetadata, + handleDocumentLoaderOutput +} from '../../../src/utils' import { S3Client, GetObjectCommand, S3ClientConfig } from '@aws-sdk/client-s3' import { getRegions, MODEL_TYPE } from '../../../src/modelLoader' import { Readable } from 'node:stream' @@ -483,11 +488,6 @@ class S3_DocumentLoaders implements INode { const _omitMetadataKeys = nodeData.inputs?.omitMetadataKeys as string const output = nodeData.outputs?.output as string - let omitMetadataKeys: string[] = [] - if (_omitMetadataKeys) { - omitMetadataKeys = _omitMetadataKeys.split(',').map((key) => key.trim()) - } - let credentials: S3ClientConfig['credentials'] | undefined if (nodeData.credential) { @@ -572,56 +572,15 @@ class S3_DocumentLoaders implements INode { const unstructuredLoader = new UnstructuredLoader(filePath, obj) - let docs = await unstructuredLoader.load() + let docs = await handleDocumentLoaderDocuments(unstructuredLoader) - if (metadata) { - const parsedMetadata = typeof metadata === 'object' ? metadata : JSON.parse(metadata) - docs = docs.map((doc) => ({ - ...doc, - metadata: - _omitMetadataKeys === '*' - ? { - ...parsedMetadata - } - : omit( - { - ...doc.metadata, - ...parsedMetadata, - [sourceIdKey]: doc.metadata[sourceIdKey] || sourceIdKey - }, - omitMetadataKeys - ) - })) - } else { - docs = docs.map((doc) => ({ - ...doc, - metadata: - _omitMetadataKeys === '*' - ? {} - : omit( - { - ...doc.metadata, - [sourceIdKey]: doc.metadata[sourceIdKey] || sourceIdKey - }, - omitMetadataKeys - ) - })) - } + docs = handleDocumentLoaderMetadata(docs, _omitMetadataKeys, metadata, sourceIdKey) - fsDefault.rmSync(path.dirname(filePath), { recursive: true }) - - if (output === 'document') { - return docs - } else { - let finaltext = '' - for (const doc of docs) { - finaltext += `${doc.pageContent}\n` - } - return handleEscapeCharacters(finaltext, false) - } + return handleDocumentLoaderOutput(docs, output) } catch { - fsDefault.rmSync(path.dirname(filePath), { recursive: true }) throw new Error(`Failed to load file ${filePath} using unstructured loader.`) + } finally { + fsDefault.rmSync(path.dirname(filePath), { recursive: true }) } } diff --git a/packages/components/package.json b/packages/components/package.json index 543e8f7e..7e65a9ac 100644 --- a/packages/components/package.json +++ b/packages/components/package.json @@ -115,6 +115,7 @@ "object-hash": "^3.0.0", "ollama": "^0.5.11", "openai": "^4.57.3", + "papaparse": "^5.4.1", "pdf-parse": "^1.1.1", "pdfjs-dist": "^3.7.107", "pg": "^8.11.2", @@ -140,6 +141,7 @@ "@types/lodash": "^4.14.202", "@types/node-fetch": "2.6.2", "@types/object-hash": "^3.0.2", + "@types/papaparse": "^5.3.15", "@types/pg": "^8.10.2", "@types/ws": "^8.5.3", "babel-register": "^6.26.0", diff --git a/packages/components/src/utils.ts b/packages/components/src/utils.ts index 1c598477..ae91f612 100644 --- a/packages/components/src/utils.ts +++ b/packages/components/src/utils.ts @@ -7,10 +7,14 @@ import { z } from 'zod' import { DataSource } from 'typeorm' import { ICommonObject, IDatabaseEntity, IDocument, IMessage, INodeData, IVariable, MessageContentImageUrl } from './Interface' import { AES, enc } from 'crypto-js' +import { omit } from 'lodash' import { AIMessage, HumanMessage, BaseMessage } from '@langchain/core/messages' +import { Document } from '@langchain/core/documents' import { getFileFromStorage } from './storageUtils' import { GetSecretValueCommand, SecretsManagerClient, SecretsManagerClientConfig } from '@aws-sdk/client-secrets-manager' import { customGet } from '../nodes/sequentialagents/commonUtils' +import { TextSplitter } from 'langchain/text_splitter' +import { DocumentLoader } from 'langchain/document_loaders/base' export const numberOrExpressionRegex = '^(\\d+\\.?\\d*|{{.*}})$' //return true if string consists only numbers OR expression {{}} export const notEmptyRegex = '(.|\\s)*\\S(.|\\s)*' //return true if string is not empty or blank @@ -1077,3 +1081,68 @@ export const resolveFlowObjValue = (obj: any, sourceObj: any): any => { return obj } } + +export const handleDocumentLoaderOutput = (docs: Document[], output: string) => { + if (output === 'document') { + return docs + } else { + let finaltext = '' + for (const doc of docs) { + finaltext += `${doc.pageContent}\n` + } + return handleEscapeCharacters(finaltext, false) + } +} + +export const parseDocumentLoaderMetadata = (metadata: object | string): object => { + if (!metadata) return {} + + if (typeof metadata !== 'object') { + return JSON.parse(metadata) + } + + return metadata +} + +export const handleDocumentLoaderMetadata = ( + docs: Document[], + _omitMetadataKeys: string, + metadata: object | string = {}, + sourceIdKey?: string +) => { + let omitMetadataKeys: string[] = [] + if (_omitMetadataKeys) { + omitMetadataKeys = _omitMetadataKeys.split(',').map((key) => key.trim()) + } + + metadata = parseDocumentLoaderMetadata(metadata) + + return docs.map((doc) => ({ + ...doc, + metadata: + _omitMetadataKeys === '*' + ? metadata + : omit( + { + ...metadata, + ...doc.metadata, + ...(sourceIdKey ? { [sourceIdKey]: doc.metadata[sourceIdKey] || sourceIdKey } : undefined) + }, + omitMetadataKeys + ) + })) +} + +export const handleDocumentLoaderDocuments = async (loader: DocumentLoader, textSplitter?: TextSplitter) => { + let docs: Document[] = [] + + if (textSplitter) { + let splittedDocs = await loader.load() + splittedDocs = await textSplitter.splitDocuments(splittedDocs) + docs = splittedDocs + } else { + docs = await loader.load() + } + + return docs +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index bb8ec0a7..0c6911f6 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -388,6 +388,9 @@ importers: openai: specifier: 4.57.3 version: 4.57.3(encoding@0.1.13)(zod@3.22.4) + papaparse: + specifier: ^5.4.1 + version: 5.4.1 pdf-parse: specifier: ^1.1.1 version: 1.1.1 @@ -458,6 +461,9 @@ importers: '@types/object-hash': specifier: ^3.0.2 version: 3.0.6 + '@types/papaparse': + specifier: ^5.3.15 + version: 5.3.15 '@types/pg': specifier: ^8.10.2 version: 8.11.2 @@ -6365,8 +6371,8 @@ packages: '@types/object-hash@3.0.6': resolution: { integrity: sha512-fOBV8C1FIu2ELinoILQ+ApxcUKz4ngq+IWUYrxSGjXzzjUALijilampwkMgEtJ+h2njAW3pi853QpzNVCHB73w== } - '@types/papaparse@5.3.14': - resolution: { integrity: sha512-LxJ4iEFcpqc6METwp9f6BV6VVc43m6MfH0VqFosHvrUgfXiFe6ww7R3itkOQ+TCK6Y+Iv/+RnnvtRZnkc5Kc9g== } + '@types/papaparse@5.3.15': + resolution: { integrity: sha512-JHe6vF6x/8Z85nCX4yFdDslN11d+1pr12E526X8WAfhadOeaOTx5AuIkvDKIBopfvlzpzkdMx4YyvSKCM9oqtw== } '@types/parse-json@4.0.2': resolution: { integrity: sha512-dISoDXWWQwUquiKsyZ4Ng+HX2KsPL7LyHKHQwgGFEA3IaKac4Obd+h2a/a6waisAoepJlBcx9paWqjA8/HVjCw== } @@ -24808,7 +24814,7 @@ snapshots: '@types/object-hash@3.0.6': {} - '@types/papaparse@5.3.14': + '@types/papaparse@5.3.15': dependencies: '@types/node': 20.12.12 @@ -31514,7 +31520,7 @@ snapshots: '@pinecone-database/pinecone': 2.2.2 '@qdrant/js-client-rest': 1.9.0(typescript@5.5.2) '@types/lodash': 4.17.4 - '@types/papaparse': 5.3.14 + '@types/papaparse': 5.3.15 '@types/pg': 8.11.6 '@xenova/transformers': 2.17.1 '@zilliz/milvus2-sdk-node': 2.4.2