Bugfix/Remove postgres vector store data when deletion (#5536)

Remove postgres vector store data when deletion

- Introduced a new `doc_id` column in MySQL, Postgres, and SQLite record managers to support document identification.
- Updated the `update` method to handle both string and object formats for keys, allowing for better flexibility in document updates.
- Enhanced `listKeys` method to filter by `doc_id` when provided in options.
- Updated vector store integrations to utilize the new `doc_id` filtering capability
This commit is contained in:
Henry Heng
2025-11-30 12:01:36 +00:00
committed by GitHub
parent e6e0c2d07b
commit 465005a503
20 changed files with 620 additions and 217 deletions
@@ -51,7 +51,6 @@ class SQLiteRecordManager_RecordManager implements INode {
label: 'Namespace',
name: 'namespace',
type: 'string',
description: 'If not specified, chatflowid will be used',
additionalParams: true,
optional: true
},
@@ -198,6 +197,15 @@ CREATE INDEX IF NOT EXISTS key_index ON "${tableName}" (key);
CREATE INDEX IF NOT EXISTS namespace_index ON "${tableName}" (namespace);
CREATE INDEX IF NOT EXISTS group_id_index ON "${tableName}" (group_id);`)
// Add doc_id column if it doesn't exist (migration for existing tables)
const checkColumn = await queryRunner.manager.query(
`SELECT COUNT(*) as count FROM pragma_table_info('${tableName}') WHERE name='doc_id';`
)
if (checkColumn[0].count === 0) {
await queryRunner.manager.query(`ALTER TABLE "${tableName}" ADD COLUMN doc_id TEXT;`)
await queryRunner.manager.query(`CREATE INDEX IF NOT EXISTS doc_id_index ON "${tableName}" (doc_id);`)
}
await queryRunner.release()
} catch (e: any) {
// This error indicates that the table already exists
@@ -228,7 +236,7 @@ CREATE INDEX IF NOT EXISTS group_id_index ON "${tableName}" (group_id);`)
}
}
async update(keys: string[], updateOptions?: UpdateOptions): Promise<void> {
async update(keys: Array<{ uid: string; docId: string }> | string[], updateOptions?: UpdateOptions): Promise<void> {
if (keys.length === 0) {
return
}
@@ -243,23 +251,23 @@ CREATE INDEX IF NOT EXISTS group_id_index ON "${tableName}" (group_id);`)
throw new Error(`Time sync issue with database ${updatedAt} < ${timeAtLeast}`)
}
const groupIds = _groupIds ?? keys.map(() => null)
// Handle both new format (objects with uid and docId) and old format (strings)
const isNewFormat = keys.length > 0 && typeof keys[0] === 'object' && 'uid' in keys[0]
const keyStrings = isNewFormat ? (keys as Array<{ uid: string; docId: string }>).map((k) => k.uid) : (keys as string[])
const docIds = isNewFormat ? (keys as Array<{ uid: string; docId: string }>).map((k) => k.docId) : keys.map(() => null)
if (groupIds.length !== keys.length) {
throw new Error(`Number of keys (${keys.length}) does not match number of group_ids (${groupIds.length})`)
const groupIds = _groupIds ?? keyStrings.map(() => null)
if (groupIds.length !== keyStrings.length) {
throw new Error(`Number of keys (${keyStrings.length}) does not match number of group_ids (${groupIds.length})`)
}
const recordsToUpsert = keys.map((key, i) => [
key,
this.namespace,
updatedAt,
groupIds[i] ?? null // Ensure groupIds[i] is null if undefined
])
const recordsToUpsert = keyStrings.map((key, i) => [key, this.namespace, updatedAt, groupIds[i] ?? null, docIds[i] ?? null])
const query = `
INSERT INTO "${tableName}" (key, namespace, updated_at, group_id)
VALUES (?, ?, ?, ?)
ON CONFLICT (key, namespace) DO UPDATE SET updated_at = excluded.updated_at`
INSERT INTO "${tableName}" (key, namespace, updated_at, group_id, doc_id)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT (key, namespace) DO UPDATE SET updated_at = excluded.updated_at, doc_id = excluded.doc_id`
try {
// To handle multiple files upsert
@@ -314,8 +322,8 @@ CREATE INDEX IF NOT EXISTS group_id_index ON "${tableName}" (group_id);`)
}
}
async listKeys(options?: ListKeyOptions): Promise<string[]> {
const { before, after, limit, groupIds } = options ?? {}
async listKeys(options?: ListKeyOptions & { docId?: string }): Promise<string[]> {
const { before, after, limit, groupIds, docId } = options ?? {}
const tableName = this.sanitizeTableName(this.tableName)
let query = `SELECT key FROM "${tableName}" WHERE namespace = ?`
@@ -344,6 +352,11 @@ CREATE INDEX IF NOT EXISTS group_id_index ON "${tableName}" (group_id);`)
values.push(...groupIds.filter((gid): gid is string => gid !== null))
}
if (docId) {
query += ` AND doc_id = ?`
values.push(docId)
}
query += ';'
const dataSource = await this.getDataSource()