AWS Bedrock embeddings call retry extension (#2917)

* AWS Bedrock embeddings call retry extension

* Removed unnecessary commented line

---------

Co-authored-by: Rok Pajk Kosec <rok.pajkkosec@gmail.com>
This commit is contained in:
Cosso
2024-08-02 23:49:15 +02:00
committed by GitHub
parent 99ef9214c7
commit edbaa539df
@@ -83,6 +83,24 @@ class AWSBedrockEmbedding_Embeddings implements INode {
}
],
optional: true
},
{
label: 'Batch Size',
name: 'batchSize',
description: 'Documents batch size to send to AWS API for Titan model embeddings. Used to avoid throttling.',
type: 'number',
optional: true,
default: 50,
additionalParams: true
},
{
label: 'Max AWS API retries',
name: 'maxRetries',
description: 'This will limit the nubmer of AWS API for Titan model embeddings call retries. Used to avoid throttling.',
type: 'number',
optional: true,
default: 5,
additionalParams: true
}
]
}
@@ -144,7 +162,9 @@ class AWSBedrockEmbedding_Embeddings implements INode {
if (iModel.startsWith('cohere')) {
return await embedTextCohere(documents, client, iModel, inputType)
} else {
return Promise.all(documents.map((document) => embedTextTitan(document, client, iModel)))
const batchSize = nodeData.inputs?.batchSize as number
const maxRetries = nodeData.inputs?.maxRetries as number
return processInBatches(documents, batchSize, maxRetries, (document) => embedTextTitan(document, client, iModel))
}
}
return model
@@ -195,4 +215,38 @@ const embedTextCohere = async (texts: string[], client: BedrockRuntimeClient, mo
}
}
const processInBatches = async (
documents: string[],
batchSize: number,
maxRetries: number,
processFunc: (document: string) => Promise<number[]>
): Promise<number[][]> => {
let sleepTime = 0
let retryCounter = 0
let result: number[][] = []
for (let i = 0; i < documents.length; i += batchSize) {
let chunk = documents.slice(i, i + batchSize)
try {
let chunkResult = await Promise.all(chunk.map(processFunc))
result.push(...chunkResult)
retryCounter = 0
} catch (e) {
if (retryCounter < maxRetries && e.name.includes('ThrottlingException')) {
retryCounter = retryCounter + 1
i = i - batchSize
sleepTime = sleepTime + 100
} else {
// Split to distinguish between throttling retry error and other errors in trance
if (e.name.includes('ThrottlingException')) {
throw new Error('AWS Bedrock retry limit reached: ' + e)
} else {
throw new Error(e)
}
}
}
await new Promise((resolve) => setTimeout(resolve, sleepTime))
}
return result
}
module.exports = { nodeClass: AWSBedrockEmbedding_Embeddings }